001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024import com.google.protobuf.Message; 025import com.google.protobuf.RpcChannel; 026import edu.umd.cs.findbugs.annotations.Nullable; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.AsyncMetaTableAccessor; 052import org.apache.hadoop.hbase.CacheEvictionStats; 053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionLocation; 059import org.apache.hadoop.hbase.MetaTableAccessor; 060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; 061import org.apache.hadoop.hbase.NamespaceDescriptor; 062import org.apache.hadoop.hbase.RegionLocations; 063import org.apache.hadoop.hbase.RegionMetrics; 064import org.apache.hadoop.hbase.RegionMetricsBuilder; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableExistsException; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.TableNotDisabledException; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.UnknownRegionException; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 075import org.apache.hadoop.hbase.client.Scan.ReadType; 076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 077import org.apache.hadoop.hbase.client.replication.TableCFs; 078import org.apache.hadoop.hbase.client.security.SecurityCapability; 079import org.apache.hadoop.hbase.exceptions.DeserializationException; 080import org.apache.hadoop.hbase.ipc.HBaseRpcController; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.replication.ReplicationException; 086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 089import org.apache.hadoop.hbase.security.access.Permission; 090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 091import org.apache.hadoop.hbase.security.access.UserPermission; 092import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 093import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos 192 .IsSnapshotCleanupEnabledResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos 244 .SetSnapshotCleanupResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 285import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 286import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 287import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 288import org.apache.hadoop.hbase.util.Bytes; 289import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 290import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 291import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 292import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 293import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 294import org.apache.hbase.thirdparty.io.netty.util.Timeout; 295import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 296import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 297import org.apache.yetus.audience.InterfaceAudience; 298import org.slf4j.Logger; 299import org.slf4j.LoggerFactory; 300 301/** 302 * The implementation of AsyncAdmin. 303 * <p> 304 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 305 * be finished inside the rpc framework thread, which means that the callbacks registered to the 306 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 307 * this class should not try to do time consuming tasks in the callbacks. 308 * @since 2.0.0 309 * @see AsyncHBaseAdmin 310 * @see AsyncConnection#getAdmin() 311 * @see AsyncConnection#getAdminBuilder() 312 */ 313@InterfaceAudience.Private 314class RawAsyncHBaseAdmin implements AsyncAdmin { 315 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 316 317 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 318 319 private final AsyncConnectionImpl connection; 320 321 private final HashedWheelTimer retryTimer; 322 323 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 324 325 private final long rpcTimeoutNs; 326 327 private final long operationTimeoutNs; 328 329 private final long pauseNs; 330 331 private final long pauseForCQTBENs; 332 333 private final int maxAttempts; 334 335 private final int startLogErrorsCnt; 336 337 private final NonceGenerator ng; 338 339 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 340 AsyncAdminBuilderBase builder) { 341 this.connection = connection; 342 this.retryTimer = retryTimer; 343 this.metaTable = connection.getTable(META_TABLE_NAME); 344 this.rpcTimeoutNs = builder.rpcTimeoutNs; 345 this.operationTimeoutNs = builder.operationTimeoutNs; 346 this.pauseNs = builder.pauseNs; 347 if (builder.pauseForCQTBENs < builder.pauseNs) { 348 LOG.warn( 349 "Configured value of pauseForCQTBENs is {} ms, which is less than" + 350 " the normal pause value {} ms, use the greater one instead", 351 TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), 352 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 353 this.pauseForCQTBENs = builder.pauseNs; 354 } else { 355 this.pauseForCQTBENs = builder.pauseForCQTBENs; 356 } 357 this.maxAttempts = builder.maxAttempts; 358 this.startLogErrorsCnt = builder.startLogErrorsCnt; 359 this.ng = connection.getNonceGenerator(); 360 } 361 362 private <T> MasterRequestCallerBuilder<T> newMasterCaller() { 363 return this.connection.callerFactory.<T> masterRequest() 364 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 365 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 366 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 367 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 368 } 369 370 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 371 return this.connection.callerFactory.<T> adminRequest() 372 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 373 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 374 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 375 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 376 } 377 378 @FunctionalInterface 379 private interface MasterRpcCall<RESP, REQ> { 380 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 381 RpcCallback<RESP> done); 382 } 383 384 @FunctionalInterface 385 private interface AdminRpcCall<RESP, REQ> { 386 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 387 RpcCallback<RESP> done); 388 } 389 390 @FunctionalInterface 391 private interface Converter<D, S> { 392 D convert(S src) throws IOException; 393 } 394 395 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 396 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 397 Converter<RESP, PRESP> respConverter) { 398 CompletableFuture<RESP> future = new CompletableFuture<>(); 399 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 400 401 @Override 402 public void run(PRESP resp) { 403 if (controller.failed()) { 404 future.completeExceptionally(controller.getFailed()); 405 } else { 406 try { 407 future.complete(respConverter.convert(resp)); 408 } catch (IOException e) { 409 future.completeExceptionally(e); 410 } 411 } 412 } 413 }); 414 return future; 415 } 416 417 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 418 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 419 Converter<RESP, PRESP> respConverter) { 420 CompletableFuture<RESP> future = new CompletableFuture<>(); 421 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 422 423 @Override 424 public void run(PRESP resp) { 425 if (controller.failed()) { 426 future.completeExceptionally(controller.getFailed()); 427 } else { 428 try { 429 future.complete(respConverter.convert(resp)); 430 } catch (IOException e) { 431 future.completeExceptionally(e); 432 } 433 } 434 } 435 }); 436 return future; 437 } 438 439 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 440 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 441 ProcedureBiConsumer consumer) { 442 return procedureCall(b -> { 443 }, preq, rpcCall, respConverter, consumer); 444 } 445 446 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 447 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 448 ProcedureBiConsumer consumer) { 449 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 450 } 451 452 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 453 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 454 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 455 ProcedureBiConsumer consumer) { 456 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 457 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 458 prioritySetter.accept(builder); 459 CompletableFuture<Long> procFuture = builder.call(); 460 CompletableFuture<Void> future = waitProcedureResult(procFuture); 461 addListener(future, consumer); 462 return future; 463 } 464 465 @FunctionalInterface 466 private interface TableOperator { 467 CompletableFuture<Void> operate(TableName table); 468 } 469 470 @Override 471 public CompletableFuture<Boolean> tableExists(TableName tableName) { 472 if (TableName.isMetaTableName(tableName)) { 473 return CompletableFuture.completedFuture(true); 474 } 475 return AsyncMetaTableAccessor.tableExists(metaTable, tableName); 476 } 477 478 @Override 479 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 480 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null, 481 includeSysTables)); 482 } 483 484 /** 485 * {@link #listTableDescriptors(boolean)} 486 */ 487 @Override 488 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 489 boolean includeSysTables) { 490 Preconditions.checkNotNull(pattern, 491 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead"); 492 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern, 493 includeSysTables)); 494 } 495 496 @Override 497 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 498 Preconditions.checkNotNull(tableNames, 499 "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead"); 500 if (tableNames.isEmpty()) { 501 return CompletableFuture.completedFuture(Collections.emptyList()); 502 } 503 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 504 } 505 506 private CompletableFuture<List<TableDescriptor>> 507 getTableDescriptors(GetTableDescriptorsRequest request) { 508 return this.<List<TableDescriptor>> newMasterCaller() 509 .action((controller, stub) -> this 510 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call( 511 controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done), 512 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 513 .call(); 514 } 515 516 @Override 517 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 518 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 519 } 520 521 @Override 522 public CompletableFuture<List<TableName>> 523 listTableNames(Pattern pattern, boolean includeSysTables) { 524 Preconditions.checkNotNull(pattern, 525 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 526 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 527 } 528 529 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 530 return this 531 .<List<TableName>> newMasterCaller() 532 .action( 533 (controller, stub) -> this 534 .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, 535 stub, request, (s, c, req, done) -> s.getTableNames(c, req, done), 536 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call(); 537 } 538 539 @Override 540 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 541 return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this 542 .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse, 543 List<TableDescriptor>> call( 544 controller, stub, 545 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 546 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 547 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 548 .call(); 549 } 550 551 @Override 552 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 553 return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this 554 .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse, 555 List<TableName>> call( 556 controller, stub, 557 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 558 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 559 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 560 .call(); 561 } 562 563 @Override 564 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 565 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 566 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 567 .action((controller, stub) -> this 568 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( 569 controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), 570 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 571 (resp) -> resp.getTableSchemaList())) 572 .call(), (tableSchemas, error) -> { 573 if (error != null) { 574 future.completeExceptionally(error); 575 return; 576 } 577 if (!tableSchemas.isEmpty()) { 578 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 579 } else { 580 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 581 } 582 }); 583 return future; 584 } 585 586 @Override 587 public CompletableFuture<Void> createTable(TableDescriptor desc) { 588 return createTable(desc.getTableName(), 589 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 590 } 591 592 @Override 593 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 594 int numRegions) { 595 try { 596 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 597 } catch (IllegalArgumentException e) { 598 return failedFuture(e); 599 } 600 } 601 602 @Override 603 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 604 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 605 + " use createTable(TableDescriptor) instead"); 606 try { 607 verifySplitKeys(splitKeys); 608 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 609 splitKeys, ng.getNonceGroup(), ng.newNonce())); 610 } catch (IllegalArgumentException e) { 611 return failedFuture(e); 612 } 613 } 614 615 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 616 Preconditions.checkNotNull(tableName, "table name is null"); 617 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 618 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 619 new CreateTableProcedureBiConsumer(tableName)); 620 } 621 622 @Override 623 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 624 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 625 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 626 ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), 627 (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 628 } 629 630 @Override 631 public CompletableFuture<Void> deleteTable(TableName tableName) { 632 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 633 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 634 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 635 new DeleteTableProcedureBiConsumer(tableName)); 636 } 637 638 @Override 639 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 640 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 641 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 642 ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), 643 (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName)); 644 } 645 646 @Override 647 public CompletableFuture<Void> enableTable(TableName tableName) { 648 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 649 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 650 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 651 new EnableTableProcedureBiConsumer(tableName)); 652 } 653 654 @Override 655 public CompletableFuture<Void> disableTable(TableName tableName) { 656 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 657 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 658 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 659 new DisableTableProcedureBiConsumer(tableName)); 660 } 661 662 /** 663 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> 664 * using passed parameters. Sets error or boolean result ('true' if table matches 665 * the passed-in targetState). 666 */ 667 private static CompletableFuture<Boolean> completeCheckTableState( 668 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 669 TableState.State targetState, TableName tableName) { 670 if (error != null) { 671 future.completeExceptionally(error); 672 } else { 673 if (tableState != null) { 674 future.complete(tableState.inStates(targetState)); 675 } else { 676 future.completeExceptionally(new TableNotFoundException(tableName)); 677 } 678 } 679 return future; 680 } 681 682 @Override 683 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 684 if (TableName.isMetaTableName(tableName)) { 685 return CompletableFuture.completedFuture(true); 686 } 687 CompletableFuture<Boolean> future = new CompletableFuture<>(); 688 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 689 completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error, 690 TableState.State.ENABLED, tableName); 691 }); 692 return future; 693 } 694 695 @Override 696 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 697 if (TableName.isMetaTableName(tableName)) { 698 return CompletableFuture.completedFuture(false); 699 } 700 CompletableFuture<Boolean> future = new CompletableFuture<>(); 701 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 702 completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error, 703 TableState.State.DISABLED, tableName); 704 }); 705 return future; 706 } 707 708 @Override 709 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 710 return isTableAvailable(tableName, Optional.empty()); 711 } 712 713 @Override 714 public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { 715 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 716 + " use isTableAvailable(TableName) instead"); 717 return isTableAvailable(tableName, Optional.of(splitKeys)); 718 } 719 720 private CompletableFuture<Boolean> isTableAvailable(TableName tableName, 721 Optional<byte[][]> splitKeys) { 722 if (TableName.isMetaTableName(tableName)) { 723 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 724 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 725 } 726 CompletableFuture<Boolean> future = new CompletableFuture<>(); 727 addListener(isTableEnabled(tableName), (enabled, error) -> { 728 if (error != null) { 729 if (error instanceof TableNotFoundException) { 730 future.complete(false); 731 } else { 732 future.completeExceptionally(error); 733 } 734 return; 735 } 736 if (!enabled) { 737 future.complete(false); 738 } else { 739 addListener( 740 AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 741 (locations, error1) -> { 742 if (error1 != null) { 743 future.completeExceptionally(error1); 744 return; 745 } 746 List<HRegionLocation> notDeployedRegions = locations.stream() 747 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 748 if (notDeployedRegions.size() > 0) { 749 if (LOG.isDebugEnabled()) { 750 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 751 } 752 future.complete(false); 753 return; 754 } 755 756 Optional<Boolean> available = 757 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); 758 future.complete(available.orElse(true)); 759 }); 760 } 761 }); 762 return future; 763 } 764 765 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) { 766 int regionCount = 0; 767 for (HRegionLocation location : locations) { 768 RegionInfo info = location.getRegion(); 769 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 770 regionCount++; 771 continue; 772 } 773 for (byte[] splitKey : splitKeys) { 774 // Just check if the splitkey is available 775 if (Bytes.equals(info.getStartKey(), splitKey)) { 776 regionCount++; 777 break; 778 } 779 } 780 } 781 return regionCount == splitKeys.length + 1; 782 } 783 784 @Override 785 public CompletableFuture<Void> addColumnFamily( 786 TableName tableName, ColumnFamilyDescriptor columnFamily) { 787 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 788 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 789 ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 790 new AddColumnFamilyProcedureBiConsumer(tableName)); 791 } 792 793 @Override 794 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 795 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 796 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 797 ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), 798 (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName)); 799 } 800 801 @Override 802 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 803 ColumnFamilyDescriptor columnFamily) { 804 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 805 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 806 ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), 807 (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName)); 808 } 809 810 @Override 811 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 812 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 813 RequestConverter.buildCreateNamespaceRequest(descriptor), 814 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 815 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 816 } 817 818 @Override 819 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 820 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 821 RequestConverter.buildModifyNamespaceRequest(descriptor), 822 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 823 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 824 } 825 826 @Override 827 public CompletableFuture<Void> deleteNamespace(String name) { 828 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 829 RequestConverter.buildDeleteNamespaceRequest(name), 830 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 831 new DeleteNamespaceProcedureBiConsumer(name)); 832 } 833 834 @Override 835 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 836 return this 837 .<NamespaceDescriptor> newMasterCaller() 838 .action( 839 (controller, stub) -> this 840 .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> 841 call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), 842 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) 843 -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); 844 } 845 846 @Override 847 public CompletableFuture<List<String>> listNamespaces() { 848 return this 849 .<List<String>> newMasterCaller() 850 .action( 851 (controller, stub) -> this 852 .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call( 853 controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req, 854 done) -> s.listNamespaces(c, req, done), 855 (resp) -> resp.getNamespaceNameList())).call(); 856 } 857 858 @Override 859 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 860 return this 861 .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this 862 .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, 863 List<NamespaceDescriptor>> call(controller, stub, 864 ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) -> 865 s.listNamespaceDescriptors(c, req, done), 866 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call(); 867 } 868 869 @Override 870 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 871 return this.<List<RegionInfo>> newAdminCaller() 872 .action((controller, stub) -> this 873 .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall( 874 controller, stub, RequestConverter.buildGetOnlineRegionRequest(), 875 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 876 resp -> ProtobufUtil.getRegionInfos(resp))) 877 .serverName(serverName).call(); 878 } 879 880 @Override 881 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 882 if (tableName.equals(META_TABLE_NAME)) { 883 return connection.registry.getMetaRegionLocations() 884 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 885 .collect(Collectors.toList())); 886 } else { 887 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName) 888 .thenApply( 889 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 890 } 891 } 892 @Override 893 public CompletableFuture<Void> flush(TableName tableName) { 894 return flush(tableName, null); 895 } 896 897 @Override 898 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 899 CompletableFuture<Void> future = new CompletableFuture<>(); 900 addListener(tableExists(tableName), (exists, err) -> { 901 if (err != null) { 902 future.completeExceptionally(err); 903 } else if (!exists) { 904 future.completeExceptionally(new TableNotFoundException(tableName)); 905 } else { 906 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 907 if (err2 != null) { 908 future.completeExceptionally(err2); 909 } else if (!tableEnabled) { 910 future.completeExceptionally(new TableNotEnabledException(tableName)); 911 } else { 912 Map<String, String> props = new HashMap<>(); 913 if (columnFamily != null) { 914 props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); 915 } 916 addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), 917 props), (ret, err3) -> { 918 if (err3 != null) { 919 future.completeExceptionally(err3); 920 } else { 921 future.complete(ret); 922 } 923 }); 924 } 925 }); 926 } 927 }); 928 return future; 929 } 930 931 @Override 932 public CompletableFuture<Void> flushRegion(byte[] regionName) { 933 return flushRegion(regionName, null); 934 } 935 936 @Override 937 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 938 CompletableFuture<Void> future = new CompletableFuture<>(); 939 addListener(getRegionLocation(regionName), (location, err) -> { 940 if (err != null) { 941 future.completeExceptionally(err); 942 return; 943 } 944 ServerName serverName = location.getServerName(); 945 if (serverName == null) { 946 future 947 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 948 return; 949 } 950 addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> { 951 if (err2 != null) { 952 future.completeExceptionally(err2); 953 } else { 954 future.complete(ret); 955 } 956 }); 957 }); 958 return future; 959 } 960 961 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo, 962 byte[] columnFamily) { 963 return this.<Void> newAdminCaller() 964 .serverName(serverName) 965 .action( 966 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( 967 controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo 968 .getRegionName(), columnFamily, false), 969 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null)) 970 .call(); 971 } 972 973 @Override 974 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 975 CompletableFuture<Void> future = new CompletableFuture<>(); 976 addListener(getRegions(sn), (hRegionInfos, err) -> { 977 if (err != null) { 978 future.completeExceptionally(err); 979 return; 980 } 981 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 982 if (hRegionInfos != null) { 983 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null))); 984 } 985 addListener(CompletableFuture.allOf( 986 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 987 if (err2 != null) { 988 future.completeExceptionally(err2); 989 } else { 990 future.complete(ret); 991 } 992 }); 993 }); 994 return future; 995 } 996 997 @Override 998 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 999 return compact(tableName, null, false, compactType); 1000 } 1001 1002 @Override 1003 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1004 CompactType compactType) { 1005 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1006 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1007 return compact(tableName, columnFamily, false, compactType); 1008 } 1009 1010 @Override 1011 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1012 return compactRegion(regionName, null, false); 1013 } 1014 1015 @Override 1016 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1017 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1018 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1019 return compactRegion(regionName, columnFamily, false); 1020 } 1021 1022 @Override 1023 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1024 return compact(tableName, null, true, compactType); 1025 } 1026 1027 @Override 1028 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1029 CompactType compactType) { 1030 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1031 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1032 return compact(tableName, columnFamily, true, compactType); 1033 } 1034 1035 @Override 1036 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1037 return compactRegion(regionName, null, true); 1038 } 1039 1040 @Override 1041 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1042 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1043 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1044 return compactRegion(regionName, columnFamily, true); 1045 } 1046 1047 @Override 1048 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1049 return compactRegionServer(sn, false); 1050 } 1051 1052 @Override 1053 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1054 return compactRegionServer(sn, true); 1055 } 1056 1057 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1058 CompletableFuture<Void> future = new CompletableFuture<>(); 1059 addListener(getRegions(sn), (hRegionInfos, err) -> { 1060 if (err != null) { 1061 future.completeExceptionally(err); 1062 return; 1063 } 1064 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1065 if (hRegionInfos != null) { 1066 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1067 } 1068 addListener(CompletableFuture.allOf( 1069 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1070 if (err2 != null) { 1071 future.completeExceptionally(err2); 1072 } else { 1073 future.complete(ret); 1074 } 1075 }); 1076 }); 1077 return future; 1078 } 1079 1080 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1081 boolean major) { 1082 CompletableFuture<Void> future = new CompletableFuture<>(); 1083 addListener(getRegionLocation(regionName), (location, err) -> { 1084 if (err != null) { 1085 future.completeExceptionally(err); 1086 return; 1087 } 1088 ServerName serverName = location.getServerName(); 1089 if (serverName == null) { 1090 future 1091 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1092 return; 1093 } 1094 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1095 (ret, err2) -> { 1096 if (err2 != null) { 1097 future.completeExceptionally(err2); 1098 } else { 1099 future.complete(ret); 1100 } 1101 }); 1102 }); 1103 return future; 1104 } 1105 1106 /** 1107 * List all region locations for the specific table. 1108 */ 1109 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1110 if (TableName.META_TABLE_NAME.equals(tableName)) { 1111 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1112 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1113 if (err != null) { 1114 future.completeExceptionally(err); 1115 } else if (metaRegions == null || metaRegions.isEmpty() || 1116 metaRegions.getDefaultRegionLocation() == null) { 1117 future.completeExceptionally(new IOException("meta region does not found")); 1118 } else { 1119 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1120 } 1121 }); 1122 return future; 1123 } else { 1124 // For non-meta table, we fetch all locations by scanning hbase:meta table 1125 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1126 } 1127 } 1128 1129 /** 1130 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1131 */ 1132 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1133 CompactType compactType) { 1134 CompletableFuture<Void> future = new CompletableFuture<>(); 1135 1136 switch (compactType) { 1137 case MOB: 1138 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1139 if (err != null) { 1140 future.completeExceptionally(err); 1141 return; 1142 } 1143 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1144 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1145 if (err2 != null) { 1146 future.completeExceptionally(err2); 1147 } else { 1148 future.complete(ret); 1149 } 1150 }); 1151 }); 1152 break; 1153 case NORMAL: 1154 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1155 if (err != null) { 1156 future.completeExceptionally(err); 1157 return; 1158 } 1159 if (locations == null || locations.isEmpty()) { 1160 future.completeExceptionally(new TableNotFoundException(tableName)); 1161 } 1162 CompletableFuture<?>[] compactFutures = 1163 locations.stream().filter(l -> l.getRegion() != null) 1164 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1165 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1166 .toArray(CompletableFuture<?>[]::new); 1167 // future complete unless all of the compact futures are completed. 1168 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1169 if (err2 != null) { 1170 future.completeExceptionally(err2); 1171 } else { 1172 future.complete(ret); 1173 } 1174 }); 1175 }); 1176 break; 1177 default: 1178 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1179 } 1180 return future; 1181 } 1182 1183 /** 1184 * Compact the region at specific region server. 1185 */ 1186 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1187 final boolean major, byte[] columnFamily) { 1188 return this 1189 .<Void> newAdminCaller() 1190 .serverName(sn) 1191 .action( 1192 (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall( 1193 controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(), 1194 major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done), 1195 resp -> null)).call(); 1196 } 1197 1198 private byte[] toEncodeRegionName(byte[] regionName) { 1199 return RegionInfo.isEncodedRegionName(regionName) ? regionName : 1200 Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1201 } 1202 1203 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1204 CompletableFuture<TableName> result) { 1205 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1206 if (err != null) { 1207 result.completeExceptionally(err); 1208 return; 1209 } 1210 RegionInfo regionInfo = location.getRegion(); 1211 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1212 result.completeExceptionally( 1213 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1214 return; 1215 } 1216 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1217 if (!tableName.get().equals(regionInfo.getTable())) { 1218 // tables of this two region should be same. 1219 result.completeExceptionally( 1220 new IllegalArgumentException("Cannot merge regions from two different tables " + 1221 tableName.get() + " and " + regionInfo.getTable())); 1222 } else { 1223 result.complete(tableName.get()); 1224 } 1225 } 1226 }); 1227 } 1228 1229 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1230 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1231 CompletableFuture<TableName> future = new CompletableFuture<>(); 1232 for (byte[] encodedRegionName : encodedRegionNames) { 1233 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1234 } 1235 return future; 1236 } 1237 1238 @Override 1239 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1240 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1241 } 1242 1243 @Override 1244 public CompletableFuture<Boolean> isMergeEnabled() { 1245 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1246 } 1247 1248 @Override 1249 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1250 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1251 } 1252 1253 @Override 1254 public CompletableFuture<Boolean> isSplitEnabled() { 1255 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1256 } 1257 1258 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1259 MasterSwitchType switchType) { 1260 SetSplitOrMergeEnabledRequest request = 1261 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1262 return this.<Boolean> newMasterCaller() 1263 .action((controller, stub) -> this 1264 .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller, 1265 stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1266 (resp) -> resp.getPrevValueList().get(0))) 1267 .call(); 1268 } 1269 1270 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1271 IsSplitOrMergeEnabledRequest request = 1272 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1273 return this 1274 .<Boolean> newMasterCaller() 1275 .action( 1276 (controller, stub) -> this 1277 .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call( 1278 controller, stub, request, 1279 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), 1280 (resp) -> resp.getEnabled())).call(); 1281 } 1282 1283 @Override 1284 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1285 if (nameOfRegionsToMerge.size() < 2) { 1286 return failedFuture(new IllegalArgumentException( 1287 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1288 } 1289 CompletableFuture<Void> future = new CompletableFuture<>(); 1290 byte[][] encodedNameOfRegionsToMerge = 1291 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1292 1293 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1294 if (err != null) { 1295 future.completeExceptionally(err); 1296 return; 1297 } 1298 1299 final MergeTableRegionsRequest request; 1300 try { 1301 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1302 forcible, ng.getNonceGroup(), ng.newNonce()); 1303 } catch (DeserializationException e) { 1304 future.completeExceptionally(e); 1305 return; 1306 } 1307 1308 addListener( 1309 this.procedureCall(tableName, request, 1310 MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId, 1311 new MergeTableRegionProcedureBiConsumer(tableName)), 1312 (ret, err2) -> { 1313 if (err2 != null) { 1314 future.completeExceptionally(err2); 1315 } else { 1316 future.complete(ret); 1317 } 1318 }); 1319 }); 1320 return future; 1321 } 1322 1323 @Override 1324 public CompletableFuture<Void> split(TableName tableName) { 1325 CompletableFuture<Void> future = new CompletableFuture<>(); 1326 addListener(tableExists(tableName), (exist, error) -> { 1327 if (error != null) { 1328 future.completeExceptionally(error); 1329 return; 1330 } 1331 if (!exist) { 1332 future.completeExceptionally(new TableNotFoundException(tableName)); 1333 return; 1334 } 1335 addListener( 1336 metaTable 1337 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1338 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) 1339 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), 1340 (results, err2) -> { 1341 if (err2 != null) { 1342 future.completeExceptionally(err2); 1343 return; 1344 } 1345 if (results != null && !results.isEmpty()) { 1346 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1347 for (Result r : results) { 1348 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { 1349 continue; 1350 } 1351 RegionLocations rl = MetaTableAccessor.getRegionLocations(r); 1352 if (rl != null) { 1353 for (HRegionLocation h : rl.getRegionLocations()) { 1354 if (h != null && h.getServerName() != null) { 1355 RegionInfo hri = h.getRegion(); 1356 if (hri == null || hri.isSplitParent() || 1357 hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1358 continue; 1359 } 1360 splitFutures.add(split(hri, null)); 1361 } 1362 } 1363 } 1364 } 1365 addListener( 1366 CompletableFuture 1367 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1368 (ret, exception) -> { 1369 if (exception != null) { 1370 future.completeExceptionally(exception); 1371 return; 1372 } 1373 future.complete(ret); 1374 }); 1375 } else { 1376 future.complete(null); 1377 } 1378 }); 1379 }); 1380 return future; 1381 } 1382 1383 @Override 1384 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1385 CompletableFuture<Void> result = new CompletableFuture<>(); 1386 if (splitPoint == null) { 1387 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1388 } 1389 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1390 (loc, err) -> { 1391 if (err != null) { 1392 result.completeExceptionally(err); 1393 } else if (loc == null || loc.getRegion() == null) { 1394 result.completeExceptionally(new IllegalArgumentException( 1395 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1396 } else { 1397 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1398 if (err2 != null) { 1399 result.completeExceptionally(err2); 1400 } else { 1401 result.complete(ret); 1402 } 1403 1404 }); 1405 } 1406 }); 1407 return result; 1408 } 1409 1410 @Override 1411 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1412 CompletableFuture<Void> future = new CompletableFuture<>(); 1413 addListener(getRegionLocation(regionName), (location, err) -> { 1414 if (err != null) { 1415 future.completeExceptionally(err); 1416 return; 1417 } 1418 RegionInfo regionInfo = location.getRegion(); 1419 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1420 future 1421 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1422 "Replicas are auto-split when their primary is split.")); 1423 return; 1424 } 1425 ServerName serverName = location.getServerName(); 1426 if (serverName == null) { 1427 future 1428 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1429 return; 1430 } 1431 addListener(split(regionInfo, null), (ret, err2) -> { 1432 if (err2 != null) { 1433 future.completeExceptionally(err2); 1434 } else { 1435 future.complete(ret); 1436 } 1437 }); 1438 }); 1439 return future; 1440 } 1441 1442 @Override 1443 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1444 Preconditions.checkNotNull(splitPoint, 1445 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1446 CompletableFuture<Void> future = new CompletableFuture<>(); 1447 addListener(getRegionLocation(regionName), (location, err) -> { 1448 if (err != null) { 1449 future.completeExceptionally(err); 1450 return; 1451 } 1452 RegionInfo regionInfo = location.getRegion(); 1453 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1454 future 1455 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1456 "Replicas are auto-split when their primary is split.")); 1457 return; 1458 } 1459 ServerName serverName = location.getServerName(); 1460 if (serverName == null) { 1461 future 1462 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1463 return; 1464 } 1465 if (regionInfo.getStartKey() != null && 1466 Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) { 1467 future.completeExceptionally( 1468 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1469 return; 1470 } 1471 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1472 if (err2 != null) { 1473 future.completeExceptionally(err2); 1474 } else { 1475 future.complete(ret); 1476 } 1477 }); 1478 }); 1479 return future; 1480 } 1481 1482 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1483 CompletableFuture<Void> future = new CompletableFuture<>(); 1484 TableName tableName = hri.getTable(); 1485 final SplitTableRegionRequest request; 1486 try { 1487 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1488 ng.newNonce()); 1489 } catch (DeserializationException e) { 1490 future.completeExceptionally(e); 1491 return future; 1492 } 1493 1494 addListener( 1495 this.procedureCall(tableName, 1496 request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId, 1497 new SplitTableRegionProcedureBiConsumer(tableName)), 1498 (ret, err2) -> { 1499 if (err2 != null) { 1500 future.completeExceptionally(err2); 1501 } else { 1502 future.complete(ret); 1503 } 1504 }); 1505 return future; 1506 } 1507 1508 @Override 1509 public CompletableFuture<Void> assign(byte[] regionName) { 1510 CompletableFuture<Void> future = new CompletableFuture<>(); 1511 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1512 if (err != null) { 1513 future.completeExceptionally(err); 1514 return; 1515 } 1516 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1517 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1518 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1519 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) 1520 .call(), (ret, err2) -> { 1521 if (err2 != null) { 1522 future.completeExceptionally(err2); 1523 } else { 1524 future.complete(ret); 1525 } 1526 }); 1527 }); 1528 return future; 1529 } 1530 1531 @Override 1532 public CompletableFuture<Void> unassign(byte[] regionName) { 1533 CompletableFuture<Void> future = new CompletableFuture<>(); 1534 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1535 if (err != null) { 1536 future.completeExceptionally(err); 1537 return; 1538 } 1539 addListener( 1540 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1541 .action(((controller, stub) -> this 1542 .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, 1543 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1544 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) 1545 .call(), 1546 (ret, err2) -> { 1547 if (err2 != null) { 1548 future.completeExceptionally(err2); 1549 } else { 1550 future.complete(ret); 1551 } 1552 }); 1553 }); 1554 return future; 1555 } 1556 1557 @Override 1558 public CompletableFuture<Void> offline(byte[] regionName) { 1559 CompletableFuture<Void> future = new CompletableFuture<>(); 1560 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1561 if (err != null) { 1562 future.completeExceptionally(err); 1563 return; 1564 } 1565 addListener( 1566 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1567 .action(((controller, stub) -> this 1568 .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub, 1569 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1570 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) 1571 .call(), 1572 (ret, err2) -> { 1573 if (err2 != null) { 1574 future.completeExceptionally(err2); 1575 } else { 1576 future.complete(ret); 1577 } 1578 }); 1579 }); 1580 return future; 1581 } 1582 1583 @Override 1584 public CompletableFuture<Void> move(byte[] regionName) { 1585 CompletableFuture<Void> future = new CompletableFuture<>(); 1586 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1587 if (err != null) { 1588 future.completeExceptionally(err); 1589 return; 1590 } 1591 addListener( 1592 moveRegion(regionInfo, 1593 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1594 (ret, err2) -> { 1595 if (err2 != null) { 1596 future.completeExceptionally(err2); 1597 } else { 1598 future.complete(ret); 1599 } 1600 }); 1601 }); 1602 return future; 1603 } 1604 1605 @Override 1606 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1607 Preconditions.checkNotNull(destServerName, 1608 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1609 CompletableFuture<Void> future = new CompletableFuture<>(); 1610 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1611 if (err != null) { 1612 future.completeExceptionally(err); 1613 return; 1614 } 1615 addListener( 1616 moveRegion(regionInfo, RequestConverter 1617 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1618 (ret, err2) -> { 1619 if (err2 != null) { 1620 future.completeExceptionally(err2); 1621 } else { 1622 future.complete(ret); 1623 } 1624 }); 1625 }); 1626 return future; 1627 } 1628 1629 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1630 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1631 .action( 1632 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1633 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1634 .call(); 1635 } 1636 1637 @Override 1638 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1639 return this 1640 .<Void> newMasterCaller() 1641 .action( 1642 (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1643 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1644 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call(); 1645 } 1646 1647 @Override 1648 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1649 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1650 Scan scan = QuotaTableUtil.makeScan(filter); 1651 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() 1652 .scan(scan, new AdvancedScanResultConsumer() { 1653 List<QuotaSettings> settings = new ArrayList<>(); 1654 1655 @Override 1656 public void onNext(Result[] results, ScanController controller) { 1657 for (Result result : results) { 1658 try { 1659 QuotaTableUtil.parseResultToCollection(result, settings); 1660 } catch (IOException e) { 1661 controller.terminate(); 1662 future.completeExceptionally(e); 1663 } 1664 } 1665 } 1666 1667 @Override 1668 public void onError(Throwable error) { 1669 future.completeExceptionally(error); 1670 } 1671 1672 @Override 1673 public void onComplete() { 1674 future.complete(settings); 1675 } 1676 }); 1677 return future; 1678 } 1679 1680 @Override 1681 public CompletableFuture<Void> addReplicationPeer(String peerId, 1682 ReplicationPeerConfig peerConfig, boolean enabled) { 1683 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1684 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1685 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1686 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1687 } 1688 1689 @Override 1690 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1691 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1692 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1693 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1694 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1695 } 1696 1697 @Override 1698 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1699 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1700 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1701 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1702 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1703 } 1704 1705 @Override 1706 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1707 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1708 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1709 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1710 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1711 } 1712 1713 @Override 1714 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1715 return this 1716 .<ReplicationPeerConfig> newMasterCaller() 1717 .action( 1718 (controller, stub) -> this 1719 .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( 1720 controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( 1721 s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1722 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); 1723 } 1724 1725 @Override 1726 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1727 ReplicationPeerConfig peerConfig) { 1728 return this 1729 .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall( 1730 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1731 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1732 (resp) -> resp.getProcId(), 1733 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1734 } 1735 1736 @Override 1737 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1738 Map<TableName, List<String>> tableCfs) { 1739 if (tableCfs == null) { 1740 return failedFuture(new ReplicationException("tableCfs is null")); 1741 } 1742 1743 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1744 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1745 if (!completeExceptionally(future, error)) { 1746 ReplicationPeerConfig newPeerConfig = 1747 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1748 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1749 if (!completeExceptionally(future, error)) { 1750 future.complete(result); 1751 } 1752 }); 1753 } 1754 }); 1755 return future; 1756 } 1757 1758 @Override 1759 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1760 Map<TableName, List<String>> tableCfs) { 1761 if (tableCfs == null) { 1762 return failedFuture(new ReplicationException("tableCfs is null")); 1763 } 1764 1765 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1766 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1767 if (!completeExceptionally(future, error)) { 1768 ReplicationPeerConfig newPeerConfig = null; 1769 try { 1770 newPeerConfig = ReplicationPeerConfigUtil 1771 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1772 } catch (ReplicationException e) { 1773 future.completeExceptionally(e); 1774 return; 1775 } 1776 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1777 if (!completeExceptionally(future, error)) { 1778 future.complete(result); 1779 } 1780 }); 1781 } 1782 }); 1783 return future; 1784 } 1785 1786 @Override 1787 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1788 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1789 } 1790 1791 @Override 1792 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1793 Preconditions.checkNotNull(pattern, 1794 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1795 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1796 } 1797 1798 private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers( 1799 ListReplicationPeersRequest request) { 1800 return this 1801 .<List<ReplicationPeerDescription>> newMasterCaller() 1802 .action( 1803 (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1804 List<ReplicationPeerDescription>> call(controller, stub, request, 1805 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1806 (resp) -> resp.getPeerDescList().stream() 1807 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1808 .collect(Collectors.toList()))).call(); 1809 } 1810 1811 @Override 1812 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1813 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1814 addListener(listTableDescriptors(), (tables, error) -> { 1815 if (!completeExceptionally(future, error)) { 1816 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1817 tables.forEach(table -> { 1818 Map<String, Integer> cfs = new HashMap<>(); 1819 Stream.of(table.getColumnFamilies()) 1820 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1821 .forEach(column -> { 1822 cfs.put(column.getNameAsString(), column.getScope()); 1823 }); 1824 if (!cfs.isEmpty()) { 1825 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1826 } 1827 }); 1828 future.complete(replicatedTableCFs); 1829 } 1830 }); 1831 return future; 1832 } 1833 1834 @Override 1835 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1836 SnapshotProtos.SnapshotDescription snapshot = 1837 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1838 try { 1839 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1840 } catch (IllegalArgumentException e) { 1841 return failedFuture(e); 1842 } 1843 CompletableFuture<Void> future = new CompletableFuture<>(); 1844 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); 1845 addListener(this.<Long> newMasterCaller() 1846 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, 1847 stub, request, (s, c, req, done) -> s.snapshot(c, req, done), 1848 resp -> resp.getExpectedTimeout())) 1849 .call(), (expectedTimeout, err) -> { 1850 if (err != null) { 1851 future.completeExceptionally(err); 1852 return; 1853 } 1854 TimerTask pollingTask = new TimerTask() { 1855 int tries = 0; 1856 long startTime = EnvironmentEdgeManager.currentTime(); 1857 long endTime = startTime + expectedTimeout; 1858 long maxPauseTime = expectedTimeout / maxAttempts; 1859 1860 @Override 1861 public void run(Timeout timeout) throws Exception { 1862 if (EnvironmentEdgeManager.currentTime() < endTime) { 1863 addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { 1864 if (err2 != null) { 1865 future.completeExceptionally(err2); 1866 } else if (done) { 1867 future.complete(null); 1868 } else { 1869 // retry again after pauseTime. 1870 long pauseTime = 1871 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1872 pauseTime = Math.min(pauseTime, maxPauseTime); 1873 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 1874 TimeUnit.MILLISECONDS); 1875 } 1876 }); 1877 } else { 1878 future.completeExceptionally( 1879 new SnapshotCreationException("Snapshot '" + snapshot.getName() + 1880 "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); 1881 } 1882 } 1883 }; 1884 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1885 }); 1886 return future; 1887 } 1888 1889 @Override 1890 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1891 return this 1892 .<Boolean> newMasterCaller() 1893 .action( 1894 (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call( 1895 controller, 1896 stub, 1897 IsSnapshotDoneRequest.newBuilder() 1898 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 1899 req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call(); 1900 } 1901 1902 @Override 1903 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1904 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1905 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1906 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1907 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1908 } 1909 1910 @Override 1911 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 1912 boolean restoreAcl) { 1913 CompletableFuture<Void> future = new CompletableFuture<>(); 1914 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 1915 if (err != null) { 1916 future.completeExceptionally(err); 1917 return; 1918 } 1919 TableName tableName = null; 1920 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 1921 for (SnapshotDescription snap : snapshotDescriptions) { 1922 if (snap.getName().equals(snapshotName)) { 1923 tableName = snap.getTableName(); 1924 break; 1925 } 1926 } 1927 } 1928 if (tableName == null) { 1929 future.completeExceptionally(new RestoreSnapshotException( 1930 "Unable to find the table name for snapshot=" + snapshotName)); 1931 return; 1932 } 1933 final TableName finalTableName = tableName; 1934 addListener(tableExists(finalTableName), (exists, err2) -> { 1935 if (err2 != null) { 1936 future.completeExceptionally(err2); 1937 } else if (!exists) { 1938 // if table does not exist, then just clone snapshot into new table. 1939 completeConditionalOnFuture(future, 1940 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl)); 1941 } else { 1942 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 1943 if (err4 != null) { 1944 future.completeExceptionally(err4); 1945 } else if (!disabled) { 1946 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 1947 } else { 1948 completeConditionalOnFuture(future, 1949 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 1950 } 1951 }); 1952 } 1953 }); 1954 }); 1955 return future; 1956 } 1957 1958 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 1959 boolean takeFailSafeSnapshot, boolean restoreAcl) { 1960 if (takeFailSafeSnapshot) { 1961 CompletableFuture<Void> future = new CompletableFuture<>(); 1962 // Step.1 Take a snapshot of the current state 1963 String failSafeSnapshotSnapshotNameFormat = 1964 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 1965 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 1966 final String failSafeSnapshotSnapshotName = 1967 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 1968 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 1969 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 1970 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1971 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 1972 if (err != null) { 1973 future.completeExceptionally(err); 1974 } else { 1975 // Step.2 Restore snapshot 1976 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl), 1977 (void2, err2) -> { 1978 if (err2 != null) { 1979 // Step.3.a Something went wrong during the restore and try to rollback. 1980 addListener( 1981 internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl), 1982 (void3, err3) -> { 1983 if (err3 != null) { 1984 future.completeExceptionally(err3); 1985 } else { 1986 String msg = 1987 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + 1988 failSafeSnapshotSnapshotName + " succeeded."; 1989 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 1990 } 1991 }); 1992 } else { 1993 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 1994 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1995 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 1996 if (err3 != null) { 1997 LOG.error( 1998 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 1999 err3); 2000 future.completeExceptionally(err3); 2001 } else { 2002 future.complete(ret3); 2003 } 2004 }); 2005 } 2006 }); 2007 } 2008 }); 2009 return future; 2010 } else { 2011 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl); 2012 } 2013 } 2014 2015 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2016 CompletableFuture<T> parentFuture) { 2017 addListener(parentFuture, (res, err) -> { 2018 if (err != null) { 2019 dependentFuture.completeExceptionally(err); 2020 } else { 2021 dependentFuture.complete(res); 2022 } 2023 }); 2024 } 2025 2026 @Override 2027 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2028 boolean restoreAcl) { 2029 CompletableFuture<Void> future = new CompletableFuture<>(); 2030 addListener(tableExists(tableName), (exists, err) -> { 2031 if (err != null) { 2032 future.completeExceptionally(err); 2033 } else if (exists) { 2034 future.completeExceptionally(new TableExistsException(tableName)); 2035 } else { 2036 completeConditionalOnFuture(future, 2037 internalRestoreSnapshot(snapshotName, tableName, restoreAcl)); 2038 } 2039 }); 2040 return future; 2041 } 2042 2043 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2044 boolean restoreAcl) { 2045 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2046 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2047 try { 2048 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2049 } catch (IllegalArgumentException e) { 2050 return failedFuture(e); 2051 } 2052 return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this 2053 .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub, 2054 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2055 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(), 2056 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2057 .call()); 2058 } 2059 2060 @Override 2061 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2062 return getCompletedSnapshots(null); 2063 } 2064 2065 @Override 2066 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2067 Preconditions.checkNotNull(pattern, 2068 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2069 return getCompletedSnapshots(pattern); 2070 } 2071 2072 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2073 return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this 2074 .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> 2075 call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), 2076 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2077 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2078 .call(); 2079 } 2080 2081 @Override 2082 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2083 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2084 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2085 return getCompletedSnapshots(tableNamePattern, null); 2086 } 2087 2088 @Override 2089 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2090 Pattern snapshotNamePattern) { 2091 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2092 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2093 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2094 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2095 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2096 } 2097 2098 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots( 2099 Pattern tableNamePattern, Pattern snapshotNamePattern) { 2100 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2101 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2102 if (err != null) { 2103 future.completeExceptionally(err); 2104 return; 2105 } 2106 if (tableNames == null || tableNames.size() <= 0) { 2107 future.complete(Collections.emptyList()); 2108 return; 2109 } 2110 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2111 if (err2 != null) { 2112 future.completeExceptionally(err2); 2113 return; 2114 } 2115 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2116 future.complete(Collections.emptyList()); 2117 return; 2118 } 2119 future.complete(snapshotDescList.stream() 2120 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2121 .collect(Collectors.toList())); 2122 }); 2123 }); 2124 return future; 2125 } 2126 2127 @Override 2128 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2129 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2130 } 2131 2132 @Override 2133 public CompletableFuture<Void> deleteSnapshots() { 2134 return internalDeleteSnapshots(null, null); 2135 } 2136 2137 @Override 2138 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2139 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2140 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2141 return internalDeleteSnapshots(null, snapshotNamePattern); 2142 } 2143 2144 @Override 2145 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2146 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2147 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2148 return internalDeleteSnapshots(tableNamePattern, null); 2149 } 2150 2151 @Override 2152 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2153 Pattern snapshotNamePattern) { 2154 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2155 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2156 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2157 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2158 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2159 } 2160 2161 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2162 Pattern snapshotNamePattern) { 2163 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2164 if (tableNamePattern == null) { 2165 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2166 } else { 2167 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2168 } 2169 CompletableFuture<Void> future = new CompletableFuture<>(); 2170 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> { 2171 if (err != null) { 2172 future.completeExceptionally(err); 2173 return; 2174 } 2175 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2176 future.complete(null); 2177 return; 2178 } 2179 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2180 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2181 if (e != null) { 2182 future.completeExceptionally(e); 2183 } else { 2184 future.complete(v); 2185 } 2186 }); 2187 })); 2188 return future; 2189 } 2190 2191 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2192 return this 2193 .<Void> newMasterCaller() 2194 .action( 2195 (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2196 controller, 2197 stub, 2198 DeleteSnapshotRequest.newBuilder() 2199 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 2200 req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call(); 2201 } 2202 2203 @Override 2204 public CompletableFuture<Void> execProcedure(String signature, String instance, 2205 Map<String, String> props) { 2206 CompletableFuture<Void> future = new CompletableFuture<>(); 2207 ProcedureDescription procDesc = 2208 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2209 addListener(this.<Long> newMasterCaller() 2210 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2211 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2212 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2213 .call(), (expectedTimeout, err) -> { 2214 if (err != null) { 2215 future.completeExceptionally(err); 2216 return; 2217 } 2218 TimerTask pollingTask = new TimerTask() { 2219 int tries = 0; 2220 long startTime = EnvironmentEdgeManager.currentTime(); 2221 long endTime = startTime + expectedTimeout; 2222 long maxPauseTime = expectedTimeout / maxAttempts; 2223 2224 @Override 2225 public void run(Timeout timeout) throws Exception { 2226 if (EnvironmentEdgeManager.currentTime() < endTime) { 2227 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2228 if (err2 != null) { 2229 future.completeExceptionally(err2); 2230 return; 2231 } 2232 if (done) { 2233 future.complete(null); 2234 } else { 2235 // retry again after pauseTime. 2236 long pauseTime = 2237 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2238 pauseTime = Math.min(pauseTime, maxPauseTime); 2239 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2240 TimeUnit.MICROSECONDS); 2241 } 2242 }); 2243 } else { 2244 future.completeExceptionally(new IOException("Procedure '" + signature + " : " + 2245 instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2246 } 2247 } 2248 }; 2249 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2250 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2251 }); 2252 return future; 2253 } 2254 2255 @Override 2256 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2257 Map<String, String> props) { 2258 ProcedureDescription proDesc = 2259 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2260 return this.<byte[]> newMasterCaller() 2261 .action( 2262 (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2263 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2264 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2265 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2266 .call(); 2267 } 2268 2269 @Override 2270 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2271 Map<String, String> props) { 2272 ProcedureDescription proDesc = 2273 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2274 return this.<Boolean> newMasterCaller() 2275 .action((controller, stub) -> this 2276 .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub, 2277 IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2278 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2279 .call(); 2280 } 2281 2282 @Override 2283 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2284 return this.<Boolean> newMasterCaller().action( 2285 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2286 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2287 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2288 .call(); 2289 } 2290 2291 @Override 2292 public CompletableFuture<String> getProcedures() { 2293 return this 2294 .<String> newMasterCaller() 2295 .action( 2296 (controller, stub) -> this 2297 .<GetProceduresRequest, GetProceduresResponse, String> call( 2298 controller, stub, GetProceduresRequest.newBuilder().build(), 2299 (s, c, req, done) -> s.getProcedures(c, req, done), 2300 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call(); 2301 } 2302 2303 @Override 2304 public CompletableFuture<String> getLocks() { 2305 return this 2306 .<String> newMasterCaller() 2307 .action( 2308 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call( 2309 controller, stub, GetLocksRequest.newBuilder().build(), 2310 (s, c, req, done) -> s.getLocks(c, req, done), 2311 resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call(); 2312 } 2313 2314 @Override 2315 public CompletableFuture<Void> decommissionRegionServers( 2316 List<ServerName> servers, boolean offload) { 2317 return this.<Void> newMasterCaller() 2318 .action((controller, stub) -> this 2319 .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call( 2320 controller, stub, 2321 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2322 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2323 .call(); 2324 } 2325 2326 @Override 2327 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2328 return this.<List<ServerName>> newMasterCaller() 2329 .action((controller, stub) -> this 2330 .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse, 2331 List<ServerName>> call( 2332 controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(), 2333 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2334 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2335 .collect(Collectors.toList()))) 2336 .call(); 2337 } 2338 2339 @Override 2340 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2341 List<byte[]> encodedRegionNames) { 2342 return this.<Void> newMasterCaller() 2343 .action((controller, stub) -> 2344 this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call( 2345 controller, stub, RequestConverter.buildRecommissionRegionServerRequest( 2346 server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer( 2347 c, req, done), resp -> null)).call(); 2348 } 2349 2350 /** 2351 * Get the region location for the passed region name. The region name may be a full region name 2352 * or encoded region name. If the region does not found, then it'll throw an 2353 * UnknownRegionException wrapped by a {@link CompletableFuture} 2354 * @param regionNameOrEncodedRegionName region name or encoded region name 2355 * @return region location, wrapped by a {@link CompletableFuture} 2356 */ 2357 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2358 if (regionNameOrEncodedRegionName == null) { 2359 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2360 } 2361 try { 2362 CompletableFuture<Optional<HRegionLocation>> future; 2363 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2364 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2365 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2366 // old format encodedName, should be meta region 2367 future = connection.registry.getMetaRegionLocations() 2368 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2369 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2370 } else { 2371 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2372 regionNameOrEncodedRegionName); 2373 } 2374 } else { 2375 RegionInfo regionInfo = 2376 MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2377 if (regionInfo.isMetaRegion()) { 2378 future = connection.registry.getMetaRegionLocations() 2379 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2380 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2381 .findFirst()); 2382 } else { 2383 future = 2384 AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2385 } 2386 } 2387 2388 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2389 addListener(future, (location, err) -> { 2390 if (err != null) { 2391 returnedFuture.completeExceptionally(err); 2392 return; 2393 } 2394 if (!location.isPresent() || location.get().getRegion() == null) { 2395 returnedFuture.completeExceptionally( 2396 new UnknownRegionException("Invalid region name or encoded region name: " + 2397 Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2398 } else { 2399 returnedFuture.complete(location.get()); 2400 } 2401 }); 2402 return returnedFuture; 2403 } catch (IOException e) { 2404 return failedFuture(e); 2405 } 2406 } 2407 2408 /** 2409 * Get the region info for the passed region name. The region name may be a full region name or 2410 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2411 * wrapped by a {@link CompletableFuture} 2412 * @return region info, wrapped by a {@link CompletableFuture} 2413 */ 2414 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2415 if (regionNameOrEncodedRegionName == null) { 2416 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2417 } 2418 2419 if (Bytes.equals(regionNameOrEncodedRegionName, 2420 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) || 2421 Bytes.equals(regionNameOrEncodedRegionName, 2422 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { 2423 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2424 } 2425 2426 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2427 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2428 if (err != null) { 2429 future.completeExceptionally(err); 2430 } else { 2431 future.complete(location.getRegion()); 2432 } 2433 }); 2434 return future; 2435 } 2436 2437 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2438 if (numRegions < 3) { 2439 throw new IllegalArgumentException("Must create at least three regions"); 2440 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2441 throw new IllegalArgumentException("Start key must be smaller than end key"); 2442 } 2443 if (numRegions == 3) { 2444 return new byte[][] { startKey, endKey }; 2445 } 2446 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2447 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2448 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2449 } 2450 return splitKeys; 2451 } 2452 2453 private void verifySplitKeys(byte[][] splitKeys) { 2454 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2455 // Verify there are no duplicate split keys 2456 byte[] lastKey = null; 2457 for (byte[] splitKey : splitKeys) { 2458 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2459 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2460 } 2461 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2462 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2463 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2464 } 2465 lastKey = splitKey; 2466 } 2467 } 2468 2469 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2470 2471 abstract void onFinished(); 2472 2473 abstract void onError(Throwable error); 2474 2475 @Override 2476 public void accept(Void v, Throwable error) { 2477 if (error != null) { 2478 onError(error); 2479 return; 2480 } 2481 onFinished(); 2482 } 2483 } 2484 2485 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2486 protected final TableName tableName; 2487 2488 TableProcedureBiConsumer(TableName tableName) { 2489 this.tableName = tableName; 2490 } 2491 2492 abstract String getOperationType(); 2493 2494 String getDescription() { 2495 return "Operation: " + getOperationType() + ", " + "Table Name: " 2496 + tableName.getNameWithNamespaceInclAsString(); 2497 } 2498 2499 @Override 2500 void onFinished() { 2501 LOG.info(getDescription() + " completed"); 2502 } 2503 2504 @Override 2505 void onError(Throwable error) { 2506 LOG.info(getDescription() + " failed with " + error.getMessage()); 2507 } 2508 } 2509 2510 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2511 protected final String namespaceName; 2512 2513 NamespaceProcedureBiConsumer(String namespaceName) { 2514 this.namespaceName = namespaceName; 2515 } 2516 2517 abstract String getOperationType(); 2518 2519 String getDescription() { 2520 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2521 } 2522 2523 @Override 2524 void onFinished() { 2525 LOG.info(getDescription() + " completed"); 2526 } 2527 2528 @Override 2529 void onError(Throwable error) { 2530 LOG.info(getDescription() + " failed with " + error.getMessage()); 2531 } 2532 } 2533 2534 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2535 2536 CreateTableProcedureBiConsumer(TableName tableName) { 2537 super(tableName); 2538 } 2539 2540 @Override 2541 String getOperationType() { 2542 return "CREATE"; 2543 } 2544 } 2545 2546 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2547 2548 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2549 super(tableName); 2550 } 2551 2552 @Override 2553 String getOperationType() { 2554 return "ENABLE"; 2555 } 2556 } 2557 2558 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2559 2560 DeleteTableProcedureBiConsumer(TableName tableName) { 2561 super(tableName); 2562 } 2563 2564 @Override 2565 String getOperationType() { 2566 return "DELETE"; 2567 } 2568 2569 @Override 2570 void onFinished() { 2571 connection.getLocator().clearCache(this.tableName); 2572 super.onFinished(); 2573 } 2574 } 2575 2576 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2577 2578 TruncateTableProcedureBiConsumer(TableName tableName) { 2579 super(tableName); 2580 } 2581 2582 @Override 2583 String getOperationType() { 2584 return "TRUNCATE"; 2585 } 2586 } 2587 2588 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2589 2590 EnableTableProcedureBiConsumer(TableName tableName) { 2591 super(tableName); 2592 } 2593 2594 @Override 2595 String getOperationType() { 2596 return "ENABLE"; 2597 } 2598 } 2599 2600 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2601 2602 DisableTableProcedureBiConsumer(TableName tableName) { 2603 super(tableName); 2604 } 2605 2606 @Override 2607 String getOperationType() { 2608 return "DISABLE"; 2609 } 2610 } 2611 2612 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2613 2614 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2615 super(tableName); 2616 } 2617 2618 @Override 2619 String getOperationType() { 2620 return "ADD_COLUMN_FAMILY"; 2621 } 2622 } 2623 2624 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2625 2626 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2627 super(tableName); 2628 } 2629 2630 @Override 2631 String getOperationType() { 2632 return "DELETE_COLUMN_FAMILY"; 2633 } 2634 } 2635 2636 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2637 2638 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2639 super(tableName); 2640 } 2641 2642 @Override 2643 String getOperationType() { 2644 return "MODIFY_COLUMN_FAMILY"; 2645 } 2646 } 2647 2648 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2649 2650 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2651 super(namespaceName); 2652 } 2653 2654 @Override 2655 String getOperationType() { 2656 return "CREATE_NAMESPACE"; 2657 } 2658 } 2659 2660 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2661 2662 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2663 super(namespaceName); 2664 } 2665 2666 @Override 2667 String getOperationType() { 2668 return "DELETE_NAMESPACE"; 2669 } 2670 } 2671 2672 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2673 2674 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2675 super(namespaceName); 2676 } 2677 2678 @Override 2679 String getOperationType() { 2680 return "MODIFY_NAMESPACE"; 2681 } 2682 } 2683 2684 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2685 2686 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2687 super(tableName); 2688 } 2689 2690 @Override 2691 String getOperationType() { 2692 return "MERGE_REGIONS"; 2693 } 2694 } 2695 2696 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2697 2698 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2699 super(tableName); 2700 } 2701 2702 @Override 2703 String getOperationType() { 2704 return "SPLIT_REGION"; 2705 } 2706 } 2707 2708 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2709 private final String peerId; 2710 private final Supplier<String> getOperation; 2711 2712 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2713 this.peerId = peerId; 2714 this.getOperation = getOperation; 2715 } 2716 2717 String getDescription() { 2718 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2719 } 2720 2721 @Override 2722 void onFinished() { 2723 LOG.info(getDescription() + " completed"); 2724 } 2725 2726 @Override 2727 void onError(Throwable error) { 2728 LOG.info(getDescription() + " failed with " + error.getMessage()); 2729 } 2730 } 2731 2732 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2733 CompletableFuture<Void> future = new CompletableFuture<>(); 2734 addListener(procFuture, (procId, error) -> { 2735 if (error != null) { 2736 future.completeExceptionally(error); 2737 return; 2738 } 2739 getProcedureResult(procId, future, 0); 2740 }); 2741 return future; 2742 } 2743 2744 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2745 addListener( 2746 this.<GetProcedureResultResponse> newMasterCaller() 2747 .action((controller, stub) -> this 2748 .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( 2749 controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2750 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2751 .call(), 2752 (response, error) -> { 2753 if (error != null) { 2754 LOG.warn("failed to get the procedure result procId={}", procId, 2755 ConnectionUtils.translateException(error)); 2756 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2757 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2758 return; 2759 } 2760 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2761 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2762 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2763 return; 2764 } 2765 if (response.hasException()) { 2766 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2767 future.completeExceptionally(ioe); 2768 } else { 2769 future.complete(null); 2770 } 2771 }); 2772 } 2773 2774 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2775 CompletableFuture<T> future = new CompletableFuture<>(); 2776 future.completeExceptionally(error); 2777 return future; 2778 } 2779 2780 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2781 if (error != null) { 2782 future.completeExceptionally(error); 2783 return true; 2784 } 2785 return false; 2786 } 2787 2788 @Override 2789 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2790 return getClusterMetrics(EnumSet.allOf(Option.class)); 2791 } 2792 2793 @Override 2794 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2795 return this 2796 .<ClusterMetrics> newMasterCaller() 2797 .action( 2798 (controller, stub) -> this 2799 .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller, 2800 stub, RequestConverter.buildGetClusterStatusRequest(options), 2801 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2802 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call(); 2803 } 2804 2805 @Override 2806 public CompletableFuture<Void> shutdown() { 2807 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2808 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2809 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 2810 resp -> null)) 2811 .call(); 2812 } 2813 2814 @Override 2815 public CompletableFuture<Void> stopMaster() { 2816 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2817 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 2818 controller, stub, StopMasterRequest.newBuilder().build(), 2819 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 2820 .call(); 2821 } 2822 2823 @Override 2824 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2825 StopServerRequest request = RequestConverter 2826 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 2827 return this.<Void> newAdminCaller().priority(HIGH_QOS) 2828 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2829 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2830 resp -> null)) 2831 .serverName(serverName).call(); 2832 } 2833 2834 @Override 2835 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2836 return this 2837 .<Void> newAdminCaller() 2838 .action( 2839 (controller, stub) -> this 2840 .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall( 2841 controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2842 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2843 .serverName(serverName).call(); 2844 } 2845 2846 @Override 2847 public CompletableFuture<Void> updateConfiguration() { 2848 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2849 addListener( 2850 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 2851 (status, err) -> { 2852 if (err != null) { 2853 future.completeExceptionally(err); 2854 } else { 2855 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2856 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 2857 futures.add(updateConfiguration(status.getMasterName())); 2858 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2859 addListener( 2860 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2861 (result, err2) -> { 2862 if (err2 != null) { 2863 future.completeExceptionally(err2); 2864 } else { 2865 future.complete(result); 2866 } 2867 }); 2868 } 2869 }); 2870 return future; 2871 } 2872 2873 @Override 2874 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 2875 return this 2876 .<Void> newAdminCaller() 2877 .action( 2878 (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall( 2879 controller, stub, RequestConverter.buildRollWALWriterRequest(), 2880 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 2881 .serverName(serverName).call(); 2882 } 2883 2884 @Override 2885 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 2886 return this 2887 .<Void> newAdminCaller() 2888 .action( 2889 (controller, stub) -> this 2890 .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall( 2891 controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s, 2892 c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 2893 .serverName(serverName).call(); 2894 } 2895 2896 @Override 2897 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 2898 return this 2899 .<List<SecurityCapability>> newMasterCaller() 2900 .action( 2901 (controller, stub) -> this 2902 .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> 2903 call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), 2904 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 2905 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 2906 .call(); 2907 } 2908 2909 @Override 2910 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 2911 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 2912 } 2913 2914 @Override 2915 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 2916 TableName tableName) { 2917 Preconditions.checkNotNull(tableName, 2918 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 2919 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 2920 } 2921 2922 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 2923 ServerName serverName) { 2924 return this.<List<RegionMetrics>> newAdminCaller() 2925 .action((controller, stub) -> this 2926 .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>> 2927 adminCall(controller, stub, request, (s, c, req, done) -> 2928 s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics)) 2929 .serverName(serverName).call(); 2930 } 2931 2932 @Override 2933 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 2934 return this 2935 .<Boolean> newMasterCaller() 2936 .action( 2937 (controller, stub) -> this 2938 .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller, 2939 stub, IsInMaintenanceModeRequest.newBuilder().build(), 2940 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 2941 resp -> resp.getInMaintenanceMode())).call(); 2942 } 2943 2944 @Override 2945 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 2946 CompactType compactType) { 2947 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2948 2949 switch (compactType) { 2950 case MOB: 2951 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 2952 if (err != null) { 2953 future.completeExceptionally(err); 2954 return; 2955 } 2956 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 2957 2958 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 2959 .action((controller, stub) -> this 2960 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 2961 controller, stub, 2962 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 2963 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2964 .call(), (resp2, err2) -> { 2965 if (err2 != null) { 2966 future.completeExceptionally(err2); 2967 } else { 2968 if (resp2.hasCompactionState()) { 2969 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 2970 } else { 2971 future.complete(CompactionState.NONE); 2972 } 2973 } 2974 }); 2975 }); 2976 break; 2977 case NORMAL: 2978 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 2979 if (err != null) { 2980 future.completeExceptionally(err); 2981 return; 2982 } 2983 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 2984 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 2985 locations.stream().filter(loc -> loc.getServerName() != null) 2986 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 2987 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 2988 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 2989 // If any region compaction state is MAJOR_AND_MINOR 2990 // the table compaction state is MAJOR_AND_MINOR, too. 2991 if (err2 != null) { 2992 future.completeExceptionally(unwrapCompletionException(err2)); 2993 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 2994 future.complete(regionState); 2995 } else { 2996 regionStates.add(regionState); 2997 } 2998 })); 2999 }); 3000 addListener( 3001 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3002 (ret, err3) -> { 3003 // If future not completed, check all regions's compaction state 3004 if (!future.isCompletedExceptionally() && !future.isDone()) { 3005 CompactionState state = CompactionState.NONE; 3006 for (CompactionState regionState : regionStates) { 3007 switch (regionState) { 3008 case MAJOR: 3009 if (state == CompactionState.MINOR) { 3010 future.complete(CompactionState.MAJOR_AND_MINOR); 3011 } else { 3012 state = CompactionState.MAJOR; 3013 } 3014 break; 3015 case MINOR: 3016 if (state == CompactionState.MAJOR) { 3017 future.complete(CompactionState.MAJOR_AND_MINOR); 3018 } else { 3019 state = CompactionState.MINOR; 3020 } 3021 break; 3022 case NONE: 3023 default: 3024 } 3025 } 3026 if (!future.isDone()) { 3027 future.complete(state); 3028 } 3029 } 3030 }); 3031 }); 3032 break; 3033 default: 3034 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3035 } 3036 3037 return future; 3038 } 3039 3040 @Override 3041 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3042 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3043 addListener(getRegionLocation(regionName), (location, err) -> { 3044 if (err != null) { 3045 future.completeExceptionally(err); 3046 return; 3047 } 3048 ServerName serverName = location.getServerName(); 3049 if (serverName == null) { 3050 future 3051 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3052 return; 3053 } 3054 addListener( 3055 this.<GetRegionInfoResponse> newAdminCaller() 3056 .action((controller, stub) -> this 3057 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 3058 controller, stub, 3059 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3060 true), 3061 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3062 .serverName(serverName).call(), 3063 (resp2, err2) -> { 3064 if (err2 != null) { 3065 future.completeExceptionally(err2); 3066 } else { 3067 if (resp2.hasCompactionState()) { 3068 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3069 } else { 3070 future.complete(CompactionState.NONE); 3071 } 3072 } 3073 }); 3074 }); 3075 return future; 3076 } 3077 3078 @Override 3079 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3080 MajorCompactionTimestampRequest request = 3081 MajorCompactionTimestampRequest.newBuilder() 3082 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3083 return this.<Optional<Long>> newMasterCaller().action((controller, stub) -> 3084 this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> 3085 call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp( 3086 c, req, done), ProtobufUtil::toOptionalTimestamp)).call(); 3087 } 3088 3089 @Override 3090 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion( 3091 byte[] regionName) { 3092 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3093 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3094 addListener(getRegionInfo(regionName), (region, err) -> { 3095 if (err != null) { 3096 future.completeExceptionally(err); 3097 return; 3098 } 3099 MajorCompactionTimestampForRegionRequest.Builder builder = 3100 MajorCompactionTimestampForRegionRequest.newBuilder(); 3101 builder.setRegion( 3102 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3103 addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this 3104 .<MajorCompactionTimestampForRegionRequest, 3105 MajorCompactionTimestampResponse, Optional<Long>> call( 3106 controller, stub, builder.build(), 3107 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3108 ProtobufUtil::toOptionalTimestamp)) 3109 .call(), (timestamp, err2) -> { 3110 if (err2 != null) { 3111 future.completeExceptionally(err2); 3112 } else { 3113 future.complete(timestamp); 3114 } 3115 }); 3116 }); 3117 return future; 3118 } 3119 3120 @Override 3121 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3122 List<String> serverNamesList) { 3123 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3124 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3125 if (err != null) { 3126 future.completeExceptionally(err); 3127 return; 3128 } 3129 // Accessed by multiple threads. 3130 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3131 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3132 serverNames.stream().forEach(serverName -> { 3133 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3134 if (err2 != null) { 3135 future.completeExceptionally(unwrapCompletionException(err2)); 3136 } else { 3137 serverStates.put(serverName, serverState); 3138 } 3139 })); 3140 }); 3141 addListener( 3142 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3143 (ret, err3) -> { 3144 if (!future.isCompletedExceptionally()) { 3145 if (err3 != null) { 3146 future.completeExceptionally(err3); 3147 } else { 3148 future.complete(serverStates); 3149 } 3150 } 3151 }); 3152 }); 3153 return future; 3154 } 3155 3156 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3157 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3158 if (serverNamesList.isEmpty()) { 3159 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3160 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3161 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3162 if (err != null) { 3163 future.completeExceptionally(err); 3164 } else { 3165 future.complete(clusterMetrics.getServersName()); 3166 } 3167 }); 3168 return future; 3169 } else { 3170 List<ServerName> serverList = new ArrayList<>(); 3171 for (String regionServerName : serverNamesList) { 3172 ServerName serverName = null; 3173 try { 3174 serverName = ServerName.valueOf(regionServerName); 3175 } catch (Exception e) { 3176 future.completeExceptionally( 3177 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3178 } 3179 if (serverName == null) { 3180 future.completeExceptionally( 3181 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3182 } else { 3183 serverList.add(serverName); 3184 } 3185 } 3186 future.complete(serverList); 3187 } 3188 return future; 3189 } 3190 3191 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3192 return this 3193 .<Boolean>newAdminCaller() 3194 .serverName(serverName) 3195 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3196 Boolean>adminCall(controller, stub, 3197 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) -> 3198 s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call(); 3199 } 3200 3201 @Override 3202 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3203 return this.<Boolean> newMasterCaller() 3204 .action((controller, stub) -> this 3205 .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub, 3206 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3207 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3208 (resp) -> resp.getPrevBalanceValue())) 3209 .call(); 3210 } 3211 3212 @Override 3213 public CompletableFuture<Boolean> balance(boolean forcible) { 3214 return this 3215 .<Boolean> newMasterCaller() 3216 .action( 3217 (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, 3218 stub, RequestConverter.buildBalanceRequest(forcible), 3219 (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); 3220 } 3221 3222 @Override 3223 public CompletableFuture<Boolean> isBalancerEnabled() { 3224 return this 3225 .<Boolean> newMasterCaller() 3226 .action((controller, stub) -> 3227 this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller, 3228 stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done) 3229 -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call(); 3230 } 3231 3232 @Override 3233 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3234 return this 3235 .<Boolean> newMasterCaller() 3236 .action( 3237 (controller, stub) -> this 3238 .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call( 3239 controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c, 3240 req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp 3241 .getPrevNormalizerValue())).call(); 3242 } 3243 3244 @Override 3245 public CompletableFuture<Boolean> isNormalizerEnabled() { 3246 return this 3247 .<Boolean> newMasterCaller() 3248 .action( 3249 (controller, stub) -> this 3250 .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller, 3251 stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3252 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), 3253 (resp) -> resp.getEnabled())).call(); 3254 } 3255 3256 @Override 3257 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3258 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3259 } 3260 3261 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3262 return this 3263 .<Boolean> newMasterCaller() 3264 .action( 3265 (controller, stub) -> this.call( 3266 controller, stub, request, MasterService.Interface::normalize, 3267 NormalizeResponse::getNormalizerRan)) 3268 .call(); 3269 } 3270 3271 @Override 3272 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3273 return this 3274 .<Boolean> newMasterCaller() 3275 .action( 3276 (controller, stub) -> this 3277 .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call( 3278 controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s, 3279 c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp 3280 .getPrevValue())).call(); 3281 } 3282 3283 @Override 3284 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3285 return this 3286 .<Boolean> newMasterCaller() 3287 .action( 3288 (controller, stub) -> this 3289 .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call( 3290 controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req, 3291 done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3292 .call(); 3293 } 3294 3295 @Override 3296 public CompletableFuture<Boolean> runCleanerChore() { 3297 return this 3298 .<Boolean> newMasterCaller() 3299 .action( 3300 (controller, stub) -> this 3301 .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub, 3302 RequestConverter.buildRunCleanerChoreRequest(), 3303 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3304 (resp) -> resp.getCleanerChoreRan())).call(); 3305 } 3306 3307 @Override 3308 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3309 return this 3310 .<Boolean> newMasterCaller() 3311 .action( 3312 (controller, stub) -> this 3313 .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call( 3314 controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s, 3315 c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp 3316 .getPrevValue())).call(); 3317 } 3318 3319 @Override 3320 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3321 return this 3322 .<Boolean> newMasterCaller() 3323 .action( 3324 (controller, stub) -> this 3325 .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call( 3326 controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c, 3327 req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp 3328 .getValue())).call(); 3329 } 3330 3331 @Override 3332 public CompletableFuture<Integer> runCatalogJanitor() { 3333 return this 3334 .<Integer> newMasterCaller() 3335 .action( 3336 (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call( 3337 controller, stub, RequestConverter.buildCatalogScanRequest(), 3338 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3339 .call(); 3340 } 3341 3342 @Override 3343 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3344 ServiceCaller<S, R> callable) { 3345 MasterCoprocessorRpcChannelImpl channel = 3346 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3347 S stub = stubMaker.apply(channel); 3348 CompletableFuture<R> future = new CompletableFuture<>(); 3349 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3350 callable.call(stub, controller, resp -> { 3351 if (controller.failed()) { 3352 future.completeExceptionally(controller.getFailed()); 3353 } else { 3354 future.complete(resp); 3355 } 3356 }); 3357 return future; 3358 } 3359 3360 @Override 3361 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3362 ServiceCaller<S, R> callable, ServerName serverName) { 3363 RegionServerCoprocessorRpcChannelImpl channel = 3364 new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName( 3365 serverName)); 3366 S stub = stubMaker.apply(channel); 3367 CompletableFuture<R> future = new CompletableFuture<>(); 3368 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3369 callable.call(stub, controller, resp -> { 3370 if (controller.failed()) { 3371 future.completeExceptionally(controller.getFailed()); 3372 } else { 3373 future.complete(resp); 3374 } 3375 }); 3376 return future; 3377 } 3378 3379 @Override 3380 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3381 return this.<List<ServerName>> newMasterCaller() 3382 .action((controller, stub) -> this 3383 .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call( 3384 controller, stub, RequestConverter.buildClearDeadServersRequest(servers), 3385 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3386 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3387 .call(); 3388 } 3389 3390 private <T> ServerRequestCallerBuilder<T> newServerCaller() { 3391 return this.connection.callerFactory.<T> serverRequest() 3392 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3393 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3394 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 3395 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3396 } 3397 3398 @Override 3399 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3400 if (tableName == null) { 3401 return failedFuture(new IllegalArgumentException("Table name is null")); 3402 } 3403 CompletableFuture<Void> future = new CompletableFuture<>(); 3404 addListener(tableExists(tableName), (exist, err) -> { 3405 if (err != null) { 3406 future.completeExceptionally(err); 3407 return; 3408 } 3409 if (!exist) { 3410 future.completeExceptionally(new TableNotFoundException( 3411 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3412 return; 3413 } 3414 addListener(getTableSplits(tableName), (splits, err1) -> { 3415 if (err1 != null) { 3416 future.completeExceptionally(err1); 3417 } else { 3418 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3419 if (err2 != null) { 3420 future.completeExceptionally(err2); 3421 } else { 3422 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3423 if (err3 != null) { 3424 future.completeExceptionally(err3); 3425 } else { 3426 future.complete(result3); 3427 } 3428 }); 3429 } 3430 }); 3431 } 3432 }); 3433 }); 3434 return future; 3435 } 3436 3437 @Override 3438 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3439 if (tableName == null) { 3440 return failedFuture(new IllegalArgumentException("Table name is null")); 3441 } 3442 CompletableFuture<Void> future = new CompletableFuture<>(); 3443 addListener(tableExists(tableName), (exist, err) -> { 3444 if (err != null) { 3445 future.completeExceptionally(err); 3446 return; 3447 } 3448 if (!exist) { 3449 future.completeExceptionally(new TableNotFoundException( 3450 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3451 return; 3452 } 3453 addListener(setTableReplication(tableName, false), (result, err2) -> { 3454 if (err2 != null) { 3455 future.completeExceptionally(err2); 3456 } else { 3457 future.complete(result); 3458 } 3459 }); 3460 }); 3461 return future; 3462 } 3463 3464 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3465 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3466 addListener( 3467 getRegions(tableName).thenApply(regions -> regions.stream() 3468 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3469 (regions, err2) -> { 3470 if (err2 != null) { 3471 future.completeExceptionally(err2); 3472 return; 3473 } 3474 if (regions.size() == 1) { 3475 future.complete(null); 3476 } else { 3477 byte[][] splits = new byte[regions.size() - 1][]; 3478 for (int i = 1; i < regions.size(); i++) { 3479 splits[i - 1] = regions.get(i).getStartKey(); 3480 } 3481 future.complete(splits); 3482 } 3483 }); 3484 return future; 3485 } 3486 3487 /** 3488 * Connect to peer and check the table descriptor on peer: 3489 * <ol> 3490 * <li>Create the same table on peer when not exist.</li> 3491 * <li>Throw an exception if the table already has replication enabled on any of the column 3492 * families.</li> 3493 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3494 * </ol> 3495 * @param tableName name of the table to sync to the peer 3496 * @param splits table split keys 3497 */ 3498 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3499 byte[][] splits) { 3500 CompletableFuture<Void> future = new CompletableFuture<>(); 3501 addListener(listReplicationPeers(), (peers, err) -> { 3502 if (err != null) { 3503 future.completeExceptionally(err); 3504 return; 3505 } 3506 if (peers == null || peers.size() <= 0) { 3507 future.completeExceptionally( 3508 new IllegalArgumentException("Found no peer cluster for replication.")); 3509 return; 3510 } 3511 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3512 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3513 .forEach(peer -> { 3514 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3515 }); 3516 addListener( 3517 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3518 (result, err2) -> { 3519 if (err2 != null) { 3520 future.completeExceptionally(err2); 3521 } else { 3522 future.complete(result); 3523 } 3524 }); 3525 }); 3526 return future; 3527 } 3528 3529 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3530 ReplicationPeerDescription peer) { 3531 Configuration peerConf = null; 3532 try { 3533 peerConf = 3534 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3535 } catch (IOException e) { 3536 return failedFuture(e); 3537 } 3538 CompletableFuture<Void> future = new CompletableFuture<>(); 3539 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3540 if (err != null) { 3541 future.completeExceptionally(err); 3542 return; 3543 } 3544 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3545 if (err1 != null) { 3546 future.completeExceptionally(err1); 3547 return; 3548 } 3549 AsyncAdmin peerAdmin = conn.getAdmin(); 3550 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3551 if (err2 != null) { 3552 future.completeExceptionally(err2); 3553 return; 3554 } 3555 if (!exist) { 3556 CompletableFuture<Void> createTableFuture = null; 3557 if (splits == null) { 3558 createTableFuture = peerAdmin.createTable(tableDesc); 3559 } else { 3560 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3561 } 3562 addListener(createTableFuture, (result, err3) -> { 3563 if (err3 != null) { 3564 future.completeExceptionally(err3); 3565 } else { 3566 future.complete(result); 3567 } 3568 }); 3569 } else { 3570 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3571 (result, err4) -> { 3572 if (err4 != null) { 3573 future.completeExceptionally(err4); 3574 } else { 3575 future.complete(result); 3576 } 3577 }); 3578 } 3579 }); 3580 }); 3581 }); 3582 return future; 3583 } 3584 3585 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3586 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3587 CompletableFuture<Void> future = new CompletableFuture<>(); 3588 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3589 if (err != null) { 3590 future.completeExceptionally(err); 3591 return; 3592 } 3593 if (peerTableDesc == null) { 3594 future.completeExceptionally( 3595 new IllegalArgumentException("Failed to get table descriptor for table " + 3596 tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3597 return; 3598 } 3599 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3600 future.completeExceptionally(new IllegalArgumentException( 3601 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + 3602 ", but the table descriptors are not same when compared with source cluster." + 3603 " Thus can not enable the table's replication switch.")); 3604 return; 3605 } 3606 future.complete(null); 3607 }); 3608 return future; 3609 } 3610 3611 /** 3612 * Set the table's replication switch if the table's replication switch is already not set. 3613 * @param tableName name of the table 3614 * @param enableRep is replication switch enable or disable 3615 */ 3616 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3617 CompletableFuture<Void> future = new CompletableFuture<>(); 3618 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3619 if (err != null) { 3620 future.completeExceptionally(err); 3621 return; 3622 } 3623 if (!tableDesc.matchReplicationScope(enableRep)) { 3624 int scope = 3625 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3626 TableDescriptor newTableDesc = 3627 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3628 addListener(modifyTable(newTableDesc), (result, err2) -> { 3629 if (err2 != null) { 3630 future.completeExceptionally(err2); 3631 } else { 3632 future.complete(result); 3633 } 3634 }); 3635 } else { 3636 future.complete(null); 3637 } 3638 }); 3639 return future; 3640 } 3641 3642 @Override 3643 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3644 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3645 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3646 if (err != null) { 3647 future.completeExceptionally(err); 3648 return; 3649 } 3650 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3651 locations.stream().filter(l -> l.getRegion() != null) 3652 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3653 .collect(Collectors.groupingBy(l -> l.getServerName(), 3654 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3655 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3656 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3657 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3658 futures 3659 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3660 if (err2 != null) { 3661 future.completeExceptionally(unwrapCompletionException(err2)); 3662 } else { 3663 aggregator.append(stats); 3664 } 3665 })); 3666 } 3667 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3668 (ret, err3) -> { 3669 if (err3 != null) { 3670 future.completeExceptionally(unwrapCompletionException(err3)); 3671 } else { 3672 future.complete(aggregator.sum()); 3673 } 3674 }); 3675 }); 3676 return future; 3677 } 3678 3679 @Override 3680 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3681 boolean preserveSplits) { 3682 CompletableFuture<Void> future = new CompletableFuture<>(); 3683 addListener(tableExists(tableName), (exist, err) -> { 3684 if (err != null) { 3685 future.completeExceptionally(err); 3686 return; 3687 } 3688 if (!exist) { 3689 future.completeExceptionally(new TableNotFoundException(tableName)); 3690 return; 3691 } 3692 addListener(tableExists(newTableName), (exist1, err1) -> { 3693 if (err1 != null) { 3694 future.completeExceptionally(err1); 3695 return; 3696 } 3697 if (exist1) { 3698 future.completeExceptionally(new TableExistsException(newTableName)); 3699 return; 3700 } 3701 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3702 if (err2 != null) { 3703 future.completeExceptionally(err2); 3704 return; 3705 } 3706 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3707 if (preserveSplits) { 3708 addListener(getTableSplits(tableName), (splits, err3) -> { 3709 if (err3 != null) { 3710 future.completeExceptionally(err3); 3711 } else { 3712 addListener( 3713 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3714 (result, err4) -> { 3715 if (err4 != null) { 3716 future.completeExceptionally(err4); 3717 } else { 3718 future.complete(result); 3719 } 3720 }); 3721 } 3722 }); 3723 } else { 3724 addListener(createTable(newTableDesc), (result, err5) -> { 3725 if (err5 != null) { 3726 future.completeExceptionally(err5); 3727 } else { 3728 future.complete(result); 3729 } 3730 }); 3731 } 3732 }); 3733 }); 3734 }); 3735 return future; 3736 } 3737 3738 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3739 List<RegionInfo> hris) { 3740 return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this 3741 .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall( 3742 controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris), 3743 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3744 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3745 .serverName(serverName).call(); 3746 } 3747 3748 @Override 3749 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 3750 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3751 .action((controller, stub) -> this 3752 .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub, 3753 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 3754 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 3755 resp -> resp.getPreviousRpcThrottleEnabled())) 3756 .call(); 3757 return future; 3758 } 3759 3760 @Override 3761 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 3762 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3763 .action((controller, stub) -> this 3764 .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller, 3765 stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 3766 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 3767 resp -> resp.getRpcThrottleEnabled())) 3768 .call(); 3769 return future; 3770 } 3771 3772 @Override 3773 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 3774 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3775 .action((controller, stub) -> this 3776 .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call( 3777 controller, stub, 3778 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 3779 .build(), 3780 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 3781 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 3782 .call(); 3783 return future; 3784 } 3785 3786 @Override 3787 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 3788 return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this 3789 .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse, 3790 Map<TableName, Long>> call(controller, stub, 3791 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 3792 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 3793 resp -> resp.getSizesList().stream().collect(Collectors 3794 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 3795 .call(); 3796 } 3797 3798 @Override 3799 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots( 3800 ServerName serverName) { 3801 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 3802 .action((controller, stub) -> this 3803 .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse, 3804 Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub, 3805 RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 3806 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 3807 resp -> resp.getSnapshotsList().stream() 3808 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 3809 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 3810 .serverName(serverName).call(); 3811 } 3812 3813 private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot( 3814 Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 3815 return this.<SpaceQuotaSnapshot> newMasterCaller() 3816 .action((controller, stub) -> this 3817 .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub, 3818 RequestConverter.buildGetQuotaStatesRequest(), 3819 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 3820 .call(); 3821 } 3822 3823 @Override 3824 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 3825 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 3826 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 3827 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3828 } 3829 3830 @Override 3831 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 3832 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 3833 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 3834 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 3835 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3836 } 3837 3838 @Override 3839 public CompletableFuture<Void> grant(UserPermission userPermission, 3840 boolean mergeExistingPermissions) { 3841 return this.<Void> newMasterCaller() 3842 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, 3843 stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 3844 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 3845 .call(); 3846 } 3847 3848 @Override 3849 public CompletableFuture<Void> revoke(UserPermission userPermission) { 3850 return this.<Void> newMasterCaller() 3851 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 3852 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 3853 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 3854 .call(); 3855 } 3856 3857 @Override 3858 public CompletableFuture<List<UserPermission>> 3859 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 3860 return this.<List<UserPermission>> newMasterCaller().action((controller, 3861 stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse, 3862 List<UserPermission>> call(controller, stub, 3863 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 3864 (s, c, req, done) -> s.getUserPermissions(c, req, done), 3865 resp -> resp.getUserPermissionList().stream() 3866 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 3867 .collect(Collectors.toList()))) 3868 .call(); 3869 } 3870 3871 @Override 3872 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 3873 List<Permission> permissions) { 3874 return this.<List<Boolean>> newMasterCaller() 3875 .action((controller, stub) -> this 3876 .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller, 3877 stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 3878 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 3879 resp -> resp.getHasUserPermissionList())) 3880 .call(); 3881 } 3882 3883 @Override 3884 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, 3885 final boolean sync) { 3886 return this.<Boolean>newMasterCaller() 3887 .action((controller, stub) -> this 3888 .call(controller, stub, 3889 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 3890 MasterService.Interface::switchSnapshotCleanup, 3891 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 3892 .call(); 3893 } 3894 3895 @Override 3896 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 3897 return this.<Boolean>newMasterCaller() 3898 .action((controller, stub) -> this 3899 .call(controller, stub, 3900 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 3901 MasterService.Interface::isSnapshotCleanupEnabled, 3902 IsSnapshotCleanupEnabledResponse::getEnabled)) 3903 .call(); 3904 } 3905 3906 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 3907 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 3908 final String logType) { 3909 if (CollectionUtils.isEmpty(serverNames)) { 3910 return CompletableFuture.completedFuture(Collections.emptyList()); 3911 } 3912 return CompletableFuture.supplyAsync(() -> serverNames.stream() 3913 .map((ServerName serverName) -> 3914 getSlowLogResponseFromServer(serverName, filterParams, limit, logType)) 3915 .map(CompletableFuture::join) 3916 .flatMap(List::stream) 3917 .collect(Collectors.toList())); 3918 } 3919 3920 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 3921 Map<String, Object> filterParams, int limit, String logType) { 3922 return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this 3923 .adminCall(controller, stub, 3924 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 3925 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 3926 .serverName(serverName).call(); 3927 } 3928 3929 @Override 3930 public CompletableFuture<List<Boolean>> clearSlowLogResponses( 3931 @Nullable Set<ServerName> serverNames) { 3932 if (CollectionUtils.isEmpty(serverNames)) { 3933 return CompletableFuture.completedFuture(Collections.emptyList()); 3934 } 3935 List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream() 3936 .map(this::clearSlowLogsResponses) 3937 .collect(Collectors.toList()); 3938 return convertToFutureOfList(clearSlowLogResponseList); 3939 } 3940 3941 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 3942 return this.<Boolean>newAdminCaller() 3943 .action(((controller, stub) -> this 3944 .adminCall( 3945 controller, stub, RequestConverter.buildClearSlowLogResponseRequest(), 3946 AdminService.Interface::clearSlowLogsResponses, 3947 ProtobufUtil::toClearSlowLogPayload)) 3948 ).serverName(serverName).call(); 3949 } 3950 3951 private static <T> CompletableFuture<List<T>> convertToFutureOfList( 3952 List<CompletableFuture<T>> futures) { 3953 CompletableFuture<Void> allDoneFuture = 3954 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 3955 return allDoneFuture.thenApply(v -> 3956 futures.stream() 3957 .map(CompletableFuture::join) 3958 .collect(Collectors.toList()) 3959 ); 3960 } 3961 3962 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 3963 return this.<List<LogEntry>>newMasterCaller() 3964 .action((controller, stub) -> 3965 this.call(controller, stub, 3966 ProtobufUtil.toBalancerDecisionRequest(limit), 3967 MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse)) 3968 .call(); 3969 } 3970 3971 @Override 3972 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 3973 String logType, ServerType serverType, int limit, 3974 Map<String, Object> filterParams) { 3975 if (logType == null || serverType == null) { 3976 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 3977 } 3978 if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) { 3979 if (ServerType.MASTER.equals(serverType)) { 3980 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 3981 } 3982 return getSlowLogResponses(filterParams, serverNames, limit, logType); 3983 } else if (logType.equals("BALANCER_DECISION")) { 3984 if (ServerType.REGION_SERVER.equals(serverType)) { 3985 throw new IllegalArgumentException( 3986 "Balancer Decision logs are not maintained by HRegionServer"); 3987 } 3988 return getBalancerDecisions(limit); 3989 } 3990 return CompletableFuture.completedFuture(Collections.emptyList()); 3991 } 3992}