001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024import com.google.protobuf.Message; 025import com.google.protobuf.RpcChannel; 026import edu.umd.cs.findbugs.annotations.Nullable; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.AsyncMetaTableAccessor; 052import org.apache.hadoop.hbase.CacheEvictionStats; 053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionLocation; 059import org.apache.hadoop.hbase.MetaTableAccessor; 060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; 061import org.apache.hadoop.hbase.NamespaceDescriptor; 062import org.apache.hadoop.hbase.RegionLocations; 063import org.apache.hadoop.hbase.RegionMetrics; 064import org.apache.hadoop.hbase.RegionMetricsBuilder; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableExistsException; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.TableNotDisabledException; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.UnknownRegionException; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 075import org.apache.hadoop.hbase.client.Scan.ReadType; 076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 077import org.apache.hadoop.hbase.client.replication.TableCFs; 078import org.apache.hadoop.hbase.client.security.SecurityCapability; 079import org.apache.hadoop.hbase.exceptions.DeserializationException; 080import org.apache.hadoop.hbase.ipc.HBaseRpcController; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.replication.ReplicationException; 086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 089import org.apache.hadoop.hbase.security.access.Permission; 090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 091import org.apache.hadoop.hbase.security.access.UserPermission; 092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 098import org.apache.yetus.audience.InterfaceAudience; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 102import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 103import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 104import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 105import org.apache.hbase.thirdparty.io.netty.util.Timeout; 106import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 107import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 108 109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 110import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos 209 .IsSnapshotCleanupEnabledResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos 261 .SetSnapshotCleanupResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 302 303/** 304 * The implementation of AsyncAdmin. 305 * <p> 306 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 307 * be finished inside the rpc framework thread, which means that the callbacks registered to the 308 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 309 * this class should not try to do time consuming tasks in the callbacks. 310 * @since 2.0.0 311 * @see AsyncHBaseAdmin 312 * @see AsyncConnection#getAdmin() 313 * @see AsyncConnection#getAdminBuilder() 314 */ 315@InterfaceAudience.Private 316class RawAsyncHBaseAdmin implements AsyncAdmin { 317 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 318 319 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 320 321 private final AsyncConnectionImpl connection; 322 323 private final HashedWheelTimer retryTimer; 324 325 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 326 327 private final long rpcTimeoutNs; 328 329 private final long operationTimeoutNs; 330 331 private final long pauseNs; 332 333 private final long pauseForCQTBENs; 334 335 private final int maxAttempts; 336 337 private final int startLogErrorsCnt; 338 339 private final NonceGenerator ng; 340 341 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 342 AsyncAdminBuilderBase builder) { 343 this.connection = connection; 344 this.retryTimer = retryTimer; 345 this.metaTable = connection.getTable(META_TABLE_NAME); 346 this.rpcTimeoutNs = builder.rpcTimeoutNs; 347 this.operationTimeoutNs = builder.operationTimeoutNs; 348 this.pauseNs = builder.pauseNs; 349 if (builder.pauseForCQTBENs < builder.pauseNs) { 350 LOG.warn( 351 "Configured value of pauseForCQTBENs is {} ms, which is less than" + 352 " the normal pause value {} ms, use the greater one instead", 353 TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), 354 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 355 this.pauseForCQTBENs = builder.pauseNs; 356 } else { 357 this.pauseForCQTBENs = builder.pauseForCQTBENs; 358 } 359 this.maxAttempts = builder.maxAttempts; 360 this.startLogErrorsCnt = builder.startLogErrorsCnt; 361 this.ng = connection.getNonceGenerator(); 362 } 363 364 private <T> MasterRequestCallerBuilder<T> newMasterCaller() { 365 return this.connection.callerFactory.<T> masterRequest() 366 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 367 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 368 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 369 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 370 } 371 372 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 373 return this.connection.callerFactory.<T> adminRequest() 374 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 375 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 376 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 377 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 378 } 379 380 @FunctionalInterface 381 private interface MasterRpcCall<RESP, REQ> { 382 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 383 RpcCallback<RESP> done); 384 } 385 386 @FunctionalInterface 387 private interface AdminRpcCall<RESP, REQ> { 388 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 389 RpcCallback<RESP> done); 390 } 391 392 @FunctionalInterface 393 private interface Converter<D, S> { 394 D convert(S src) throws IOException; 395 } 396 397 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 398 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 399 Converter<RESP, PRESP> respConverter) { 400 CompletableFuture<RESP> future = new CompletableFuture<>(); 401 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 402 403 @Override 404 public void run(PRESP resp) { 405 if (controller.failed()) { 406 future.completeExceptionally(controller.getFailed()); 407 } else { 408 try { 409 future.complete(respConverter.convert(resp)); 410 } catch (IOException e) { 411 future.completeExceptionally(e); 412 } 413 } 414 } 415 }); 416 return future; 417 } 418 419 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 420 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 421 Converter<RESP, PRESP> respConverter) { 422 CompletableFuture<RESP> future = new CompletableFuture<>(); 423 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 424 425 @Override 426 public void run(PRESP resp) { 427 if (controller.failed()) { 428 future.completeExceptionally(controller.getFailed()); 429 } else { 430 try { 431 future.complete(respConverter.convert(resp)); 432 } catch (IOException e) { 433 future.completeExceptionally(e); 434 } 435 } 436 } 437 }); 438 return future; 439 } 440 441 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 442 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 443 ProcedureBiConsumer consumer) { 444 return procedureCall(b -> { 445 }, preq, rpcCall, respConverter, consumer); 446 } 447 448 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 449 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 450 ProcedureBiConsumer consumer) { 451 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 452 } 453 454 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 455 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 456 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 457 ProcedureBiConsumer consumer) { 458 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 459 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 460 prioritySetter.accept(builder); 461 CompletableFuture<Long> procFuture = builder.call(); 462 CompletableFuture<Void> future = waitProcedureResult(procFuture); 463 addListener(future, consumer); 464 return future; 465 } 466 467 @FunctionalInterface 468 private interface TableOperator { 469 CompletableFuture<Void> operate(TableName table); 470 } 471 472 @Override 473 public CompletableFuture<Boolean> tableExists(TableName tableName) { 474 if (TableName.isMetaTableName(tableName)) { 475 return CompletableFuture.completedFuture(true); 476 } 477 return AsyncMetaTableAccessor.tableExists(metaTable, tableName); 478 } 479 480 @Override 481 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 482 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null, 483 includeSysTables)); 484 } 485 486 /** 487 * {@link #listTableDescriptors(boolean)} 488 */ 489 @Override 490 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 491 boolean includeSysTables) { 492 Preconditions.checkNotNull(pattern, 493 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead"); 494 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern, 495 includeSysTables)); 496 } 497 498 @Override 499 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 500 Preconditions.checkNotNull(tableNames, 501 "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead"); 502 if (tableNames.isEmpty()) { 503 return CompletableFuture.completedFuture(Collections.emptyList()); 504 } 505 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 506 } 507 508 private CompletableFuture<List<TableDescriptor>> 509 getTableDescriptors(GetTableDescriptorsRequest request) { 510 return this.<List<TableDescriptor>> newMasterCaller() 511 .action((controller, stub) -> this 512 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call( 513 controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done), 514 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 515 .call(); 516 } 517 518 @Override 519 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 520 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 521 } 522 523 @Override 524 public CompletableFuture<List<TableName>> 525 listTableNames(Pattern pattern, boolean includeSysTables) { 526 Preconditions.checkNotNull(pattern, 527 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 528 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 529 } 530 531 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 532 return this 533 .<List<TableName>> newMasterCaller() 534 .action( 535 (controller, stub) -> this 536 .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, 537 stub, request, (s, c, req, done) -> s.getTableNames(c, req, done), 538 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call(); 539 } 540 541 @Override 542 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 543 return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this 544 .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse, 545 List<TableDescriptor>> call( 546 controller, stub, 547 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 548 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 549 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 550 .call(); 551 } 552 553 @Override 554 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 555 return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this 556 .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse, 557 List<TableName>> call( 558 controller, stub, 559 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 560 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 561 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 562 .call(); 563 } 564 565 @Override 566 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 567 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 568 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 569 .action((controller, stub) -> this 570 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( 571 controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), 572 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 573 (resp) -> resp.getTableSchemaList())) 574 .call(), (tableSchemas, error) -> { 575 if (error != null) { 576 future.completeExceptionally(error); 577 return; 578 } 579 if (!tableSchemas.isEmpty()) { 580 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 581 } else { 582 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 583 } 584 }); 585 return future; 586 } 587 588 @Override 589 public CompletableFuture<Void> createTable(TableDescriptor desc) { 590 return createTable(desc.getTableName(), 591 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 592 } 593 594 @Override 595 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 596 int numRegions) { 597 try { 598 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 599 } catch (IllegalArgumentException e) { 600 return failedFuture(e); 601 } 602 } 603 604 @Override 605 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 606 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 607 + " use createTable(TableDescriptor) instead"); 608 try { 609 verifySplitKeys(splitKeys); 610 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 611 splitKeys, ng.getNonceGroup(), ng.newNonce())); 612 } catch (IllegalArgumentException e) { 613 return failedFuture(e); 614 } 615 } 616 617 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 618 Preconditions.checkNotNull(tableName, "table name is null"); 619 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 620 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 621 new CreateTableProcedureBiConsumer(tableName)); 622 } 623 624 @Override 625 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 626 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 627 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 628 ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), 629 (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 630 } 631 632 @Override 633 public CompletableFuture<Void> deleteTable(TableName tableName) { 634 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 635 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 636 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 637 new DeleteTableProcedureBiConsumer(tableName)); 638 } 639 640 @Override 641 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 642 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 643 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 644 ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), 645 (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName)); 646 } 647 648 @Override 649 public CompletableFuture<Void> enableTable(TableName tableName) { 650 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 651 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 652 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 653 new EnableTableProcedureBiConsumer(tableName)); 654 } 655 656 @Override 657 public CompletableFuture<Void> disableTable(TableName tableName) { 658 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 659 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 660 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 661 new DisableTableProcedureBiConsumer(tableName)); 662 } 663 664 /** 665 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> 666 * using passed parameters. Sets error or boolean result ('true' if table matches 667 * the passed-in targetState). 668 */ 669 private static CompletableFuture<Boolean> completeCheckTableState( 670 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 671 TableState.State targetState, TableName tableName) { 672 if (error != null) { 673 future.completeExceptionally(error); 674 } else { 675 if (tableState != null) { 676 future.complete(tableState.inStates(targetState)); 677 } else { 678 future.completeExceptionally(new TableNotFoundException(tableName)); 679 } 680 } 681 return future; 682 } 683 684 @Override 685 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 686 if (TableName.isMetaTableName(tableName)) { 687 return CompletableFuture.completedFuture(true); 688 } 689 CompletableFuture<Boolean> future = new CompletableFuture<>(); 690 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 691 completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error, 692 TableState.State.ENABLED, tableName); 693 }); 694 return future; 695 } 696 697 @Override 698 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 699 if (TableName.isMetaTableName(tableName)) { 700 return CompletableFuture.completedFuture(false); 701 } 702 CompletableFuture<Boolean> future = new CompletableFuture<>(); 703 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 704 completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error, 705 TableState.State.DISABLED, tableName); 706 }); 707 return future; 708 } 709 710 @Override 711 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 712 return isTableAvailable(tableName, Optional.empty()); 713 } 714 715 @Override 716 public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { 717 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 718 + " use isTableAvailable(TableName) instead"); 719 return isTableAvailable(tableName, Optional.of(splitKeys)); 720 } 721 722 private CompletableFuture<Boolean> isTableAvailable(TableName tableName, 723 Optional<byte[][]> splitKeys) { 724 if (TableName.isMetaTableName(tableName)) { 725 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 726 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 727 } 728 CompletableFuture<Boolean> future = new CompletableFuture<>(); 729 addListener(isTableEnabled(tableName), (enabled, error) -> { 730 if (error != null) { 731 if (error instanceof TableNotFoundException) { 732 future.complete(false); 733 } else { 734 future.completeExceptionally(error); 735 } 736 return; 737 } 738 if (!enabled) { 739 future.complete(false); 740 } else { 741 addListener( 742 AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 743 (locations, error1) -> { 744 if (error1 != null) { 745 future.completeExceptionally(error1); 746 return; 747 } 748 List<HRegionLocation> notDeployedRegions = locations.stream() 749 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 750 if (notDeployedRegions.size() > 0) { 751 if (LOG.isDebugEnabled()) { 752 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 753 } 754 future.complete(false); 755 return; 756 } 757 758 Optional<Boolean> available = 759 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); 760 future.complete(available.orElse(true)); 761 }); 762 } 763 }); 764 return future; 765 } 766 767 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) { 768 int regionCount = 0; 769 for (HRegionLocation location : locations) { 770 RegionInfo info = location.getRegion(); 771 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 772 regionCount++; 773 continue; 774 } 775 for (byte[] splitKey : splitKeys) { 776 // Just check if the splitkey is available 777 if (Bytes.equals(info.getStartKey(), splitKey)) { 778 regionCount++; 779 break; 780 } 781 } 782 } 783 return regionCount == splitKeys.length + 1; 784 } 785 786 @Override 787 public CompletableFuture<Void> addColumnFamily( 788 TableName tableName, ColumnFamilyDescriptor columnFamily) { 789 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 790 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 791 ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 792 new AddColumnFamilyProcedureBiConsumer(tableName)); 793 } 794 795 @Override 796 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 797 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 798 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 799 ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), 800 (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName)); 801 } 802 803 @Override 804 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 805 ColumnFamilyDescriptor columnFamily) { 806 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 807 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 808 ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), 809 (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName)); 810 } 811 812 @Override 813 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 814 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 815 RequestConverter.buildCreateNamespaceRequest(descriptor), 816 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 817 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 818 } 819 820 @Override 821 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 822 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 823 RequestConverter.buildModifyNamespaceRequest(descriptor), 824 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 825 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 826 } 827 828 @Override 829 public CompletableFuture<Void> deleteNamespace(String name) { 830 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 831 RequestConverter.buildDeleteNamespaceRequest(name), 832 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 833 new DeleteNamespaceProcedureBiConsumer(name)); 834 } 835 836 @Override 837 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 838 return this 839 .<NamespaceDescriptor> newMasterCaller() 840 .action( 841 (controller, stub) -> this 842 .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> 843 call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), 844 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) 845 -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); 846 } 847 848 @Override 849 public CompletableFuture<List<String>> listNamespaces() { 850 return this 851 .<List<String>> newMasterCaller() 852 .action( 853 (controller, stub) -> this 854 .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call( 855 controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req, 856 done) -> s.listNamespaces(c, req, done), 857 (resp) -> resp.getNamespaceNameList())).call(); 858 } 859 860 @Override 861 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 862 return this 863 .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this 864 .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, 865 List<NamespaceDescriptor>> call(controller, stub, 866 ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) -> 867 s.listNamespaceDescriptors(c, req, done), 868 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call(); 869 } 870 871 @Override 872 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 873 return this.<List<RegionInfo>> newAdminCaller() 874 .action((controller, stub) -> this 875 .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall( 876 controller, stub, RequestConverter.buildGetOnlineRegionRequest(), 877 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 878 resp -> ProtobufUtil.getRegionInfos(resp))) 879 .serverName(serverName).call(); 880 } 881 882 @Override 883 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 884 if (tableName.equals(META_TABLE_NAME)) { 885 return connection.registry.getMetaRegionLocations() 886 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 887 .collect(Collectors.toList())); 888 } else { 889 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName) 890 .thenApply( 891 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 892 } 893 } 894 895 @Override 896 public CompletableFuture<Void> flush(TableName tableName) { 897 CompletableFuture<Void> future = new CompletableFuture<>(); 898 addListener(tableExists(tableName), (exists, err) -> { 899 if (err != null) { 900 future.completeExceptionally(err); 901 } else if (!exists) { 902 future.completeExceptionally(new TableNotFoundException(tableName)); 903 } else { 904 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 905 if (err2 != null) { 906 future.completeExceptionally(err2); 907 } else if (!tableEnabled) { 908 future.completeExceptionally(new TableNotEnabledException(tableName)); 909 } else { 910 addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), 911 new HashMap<>()), (ret, err3) -> { 912 if (err3 != null) { 913 future.completeExceptionally(err3); 914 } else { 915 future.complete(ret); 916 } 917 }); 918 } 919 }); 920 } 921 }); 922 return future; 923 } 924 925 @Override 926 public CompletableFuture<Void> flushRegion(byte[] regionName) { 927 CompletableFuture<Void> future = new CompletableFuture<>(); 928 addListener(getRegionLocation(regionName), (location, err) -> { 929 if (err != null) { 930 future.completeExceptionally(err); 931 return; 932 } 933 ServerName serverName = location.getServerName(); 934 if (serverName == null) { 935 future 936 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 937 return; 938 } 939 addListener(flush(serverName, location.getRegion()), (ret, err2) -> { 940 if (err2 != null) { 941 future.completeExceptionally(err2); 942 } else { 943 future.complete(ret); 944 } 945 }); 946 }); 947 return future; 948 } 949 950 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) { 951 return this.<Void> newAdminCaller() 952 .serverName(serverName) 953 .action( 954 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( 955 controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo 956 .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), 957 resp -> null)) 958 .call(); 959 } 960 961 @Override 962 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 963 CompletableFuture<Void> future = new CompletableFuture<>(); 964 addListener(getRegions(sn), (hRegionInfos, err) -> { 965 if (err != null) { 966 future.completeExceptionally(err); 967 return; 968 } 969 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 970 if (hRegionInfos != null) { 971 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); 972 } 973 addListener(CompletableFuture.allOf( 974 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 975 if (err2 != null) { 976 future.completeExceptionally(err2); 977 } else { 978 future.complete(ret); 979 } 980 }); 981 }); 982 return future; 983 } 984 985 @Override 986 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 987 return compact(tableName, null, false, compactType); 988 } 989 990 @Override 991 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 992 CompactType compactType) { 993 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 994 + "If you don't specify a columnFamily, use compact(TableName) instead"); 995 return compact(tableName, columnFamily, false, compactType); 996 } 997 998 @Override 999 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1000 return compactRegion(regionName, null, false); 1001 } 1002 1003 @Override 1004 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1005 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1006 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1007 return compactRegion(regionName, columnFamily, false); 1008 } 1009 1010 @Override 1011 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1012 return compact(tableName, null, true, compactType); 1013 } 1014 1015 @Override 1016 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1017 CompactType compactType) { 1018 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1019 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1020 return compact(tableName, columnFamily, true, compactType); 1021 } 1022 1023 @Override 1024 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1025 return compactRegion(regionName, null, true); 1026 } 1027 1028 @Override 1029 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1030 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1031 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1032 return compactRegion(regionName, columnFamily, true); 1033 } 1034 1035 @Override 1036 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1037 return compactRegionServer(sn, false); 1038 } 1039 1040 @Override 1041 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1042 return compactRegionServer(sn, true); 1043 } 1044 1045 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1046 CompletableFuture<Void> future = new CompletableFuture<>(); 1047 addListener(getRegions(sn), (hRegionInfos, err) -> { 1048 if (err != null) { 1049 future.completeExceptionally(err); 1050 return; 1051 } 1052 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1053 if (hRegionInfos != null) { 1054 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1055 } 1056 addListener(CompletableFuture.allOf( 1057 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1058 if (err2 != null) { 1059 future.completeExceptionally(err2); 1060 } else { 1061 future.complete(ret); 1062 } 1063 }); 1064 }); 1065 return future; 1066 } 1067 1068 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1069 boolean major) { 1070 CompletableFuture<Void> future = new CompletableFuture<>(); 1071 addListener(getRegionLocation(regionName), (location, err) -> { 1072 if (err != null) { 1073 future.completeExceptionally(err); 1074 return; 1075 } 1076 ServerName serverName = location.getServerName(); 1077 if (serverName == null) { 1078 future 1079 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1080 return; 1081 } 1082 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1083 (ret, err2) -> { 1084 if (err2 != null) { 1085 future.completeExceptionally(err2); 1086 } else { 1087 future.complete(ret); 1088 } 1089 }); 1090 }); 1091 return future; 1092 } 1093 1094 /** 1095 * List all region locations for the specific table. 1096 */ 1097 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1098 if (TableName.META_TABLE_NAME.equals(tableName)) { 1099 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1100 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1101 if (err != null) { 1102 future.completeExceptionally(err); 1103 } else if (metaRegions == null || metaRegions.isEmpty() || 1104 metaRegions.getDefaultRegionLocation() == null) { 1105 future.completeExceptionally(new IOException("meta region does not found")); 1106 } else { 1107 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1108 } 1109 }); 1110 return future; 1111 } else { 1112 // For non-meta table, we fetch all locations by scanning hbase:meta table 1113 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1114 } 1115 } 1116 1117 /** 1118 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1119 */ 1120 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1121 CompactType compactType) { 1122 CompletableFuture<Void> future = new CompletableFuture<>(); 1123 1124 switch (compactType) { 1125 case MOB: 1126 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1127 if (err != null) { 1128 future.completeExceptionally(err); 1129 return; 1130 } 1131 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1132 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1133 if (err2 != null) { 1134 future.completeExceptionally(err2); 1135 } else { 1136 future.complete(ret); 1137 } 1138 }); 1139 }); 1140 break; 1141 case NORMAL: 1142 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1143 if (err != null) { 1144 future.completeExceptionally(err); 1145 return; 1146 } 1147 if (locations == null || locations.isEmpty()) { 1148 future.completeExceptionally(new TableNotFoundException(tableName)); 1149 } 1150 CompletableFuture<?>[] compactFutures = 1151 locations.stream().filter(l -> l.getRegion() != null) 1152 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1153 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1154 .toArray(CompletableFuture<?>[]::new); 1155 // future complete unless all of the compact futures are completed. 1156 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1157 if (err2 != null) { 1158 future.completeExceptionally(err2); 1159 } else { 1160 future.complete(ret); 1161 } 1162 }); 1163 }); 1164 break; 1165 default: 1166 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1167 } 1168 return future; 1169 } 1170 1171 /** 1172 * Compact the region at specific region server. 1173 */ 1174 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1175 final boolean major, byte[] columnFamily) { 1176 return this 1177 .<Void> newAdminCaller() 1178 .serverName(sn) 1179 .action( 1180 (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall( 1181 controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(), 1182 major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done), 1183 resp -> null)).call(); 1184 } 1185 1186 private byte[] toEncodeRegionName(byte[] regionName) { 1187 return RegionInfo.isEncodedRegionName(regionName) ? regionName : 1188 Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1189 } 1190 1191 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1192 CompletableFuture<TableName> result) { 1193 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1194 if (err != null) { 1195 result.completeExceptionally(err); 1196 return; 1197 } 1198 RegionInfo regionInfo = location.getRegion(); 1199 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1200 result.completeExceptionally( 1201 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1202 return; 1203 } 1204 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1205 if (!tableName.get().equals(regionInfo.getTable())) { 1206 // tables of this two region should be same. 1207 result.completeExceptionally( 1208 new IllegalArgumentException("Cannot merge regions from two different tables " + 1209 tableName.get() + " and " + regionInfo.getTable())); 1210 } else { 1211 result.complete(tableName.get()); 1212 } 1213 } 1214 }); 1215 } 1216 1217 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1218 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1219 CompletableFuture<TableName> future = new CompletableFuture<>(); 1220 for (byte[] encodedRegionName : encodedRegionNames) { 1221 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1222 } 1223 return future; 1224 } 1225 1226 @Override 1227 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1228 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1229 } 1230 1231 @Override 1232 public CompletableFuture<Boolean> isMergeEnabled() { 1233 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1234 } 1235 1236 @Override 1237 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1238 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1239 } 1240 1241 @Override 1242 public CompletableFuture<Boolean> isSplitEnabled() { 1243 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1244 } 1245 1246 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1247 MasterSwitchType switchType) { 1248 SetSplitOrMergeEnabledRequest request = 1249 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1250 return this.<Boolean> newMasterCaller() 1251 .action((controller, stub) -> this 1252 .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller, 1253 stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1254 (resp) -> resp.getPrevValueList().get(0))) 1255 .call(); 1256 } 1257 1258 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1259 IsSplitOrMergeEnabledRequest request = 1260 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1261 return this 1262 .<Boolean> newMasterCaller() 1263 .action( 1264 (controller, stub) -> this 1265 .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call( 1266 controller, stub, request, 1267 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), 1268 (resp) -> resp.getEnabled())).call(); 1269 } 1270 1271 @Override 1272 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1273 if (nameOfRegionsToMerge.size() < 2) { 1274 return failedFuture(new IllegalArgumentException( 1275 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1276 } 1277 CompletableFuture<Void> future = new CompletableFuture<>(); 1278 byte[][] encodedNameOfRegionsToMerge = 1279 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1280 1281 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1282 if (err != null) { 1283 future.completeExceptionally(err); 1284 return; 1285 } 1286 1287 final MergeTableRegionsRequest request; 1288 try { 1289 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1290 forcible, ng.getNonceGroup(), ng.newNonce()); 1291 } catch (DeserializationException e) { 1292 future.completeExceptionally(e); 1293 return; 1294 } 1295 1296 addListener( 1297 this.procedureCall(tableName, request, 1298 MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId, 1299 new MergeTableRegionProcedureBiConsumer(tableName)), 1300 (ret, err2) -> { 1301 if (err2 != null) { 1302 future.completeExceptionally(err2); 1303 } else { 1304 future.complete(ret); 1305 } 1306 }); 1307 }); 1308 return future; 1309 } 1310 1311 @Override 1312 public CompletableFuture<Void> split(TableName tableName) { 1313 CompletableFuture<Void> future = new CompletableFuture<>(); 1314 addListener(tableExists(tableName), (exist, error) -> { 1315 if (error != null) { 1316 future.completeExceptionally(error); 1317 return; 1318 } 1319 if (!exist) { 1320 future.completeExceptionally(new TableNotFoundException(tableName)); 1321 return; 1322 } 1323 addListener( 1324 metaTable 1325 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1326 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) 1327 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), 1328 (results, err2) -> { 1329 if (err2 != null) { 1330 future.completeExceptionally(err2); 1331 return; 1332 } 1333 if (results != null && !results.isEmpty()) { 1334 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1335 for (Result r : results) { 1336 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { 1337 continue; 1338 } 1339 RegionLocations rl = MetaTableAccessor.getRegionLocations(r); 1340 if (rl != null) { 1341 for (HRegionLocation h : rl.getRegionLocations()) { 1342 if (h != null && h.getServerName() != null) { 1343 RegionInfo hri = h.getRegion(); 1344 if (hri == null || hri.isSplitParent() || 1345 hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1346 continue; 1347 } 1348 splitFutures.add(split(hri, null)); 1349 } 1350 } 1351 } 1352 } 1353 addListener( 1354 CompletableFuture 1355 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1356 (ret, exception) -> { 1357 if (exception != null) { 1358 future.completeExceptionally(exception); 1359 return; 1360 } 1361 future.complete(ret); 1362 }); 1363 } else { 1364 future.complete(null); 1365 } 1366 }); 1367 }); 1368 return future; 1369 } 1370 1371 @Override 1372 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1373 CompletableFuture<Void> result = new CompletableFuture<>(); 1374 if (splitPoint == null) { 1375 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1376 } 1377 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1378 (loc, err) -> { 1379 if (err != null) { 1380 result.completeExceptionally(err); 1381 } else if (loc == null || loc.getRegion() == null) { 1382 result.completeExceptionally(new IllegalArgumentException( 1383 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1384 } else { 1385 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1386 if (err2 != null) { 1387 result.completeExceptionally(err2); 1388 } else { 1389 result.complete(ret); 1390 } 1391 1392 }); 1393 } 1394 }); 1395 return result; 1396 } 1397 1398 @Override 1399 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1400 CompletableFuture<Void> future = new CompletableFuture<>(); 1401 addListener(getRegionLocation(regionName), (location, err) -> { 1402 if (err != null) { 1403 future.completeExceptionally(err); 1404 return; 1405 } 1406 RegionInfo regionInfo = location.getRegion(); 1407 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1408 future 1409 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1410 "Replicas are auto-split when their primary is split.")); 1411 return; 1412 } 1413 ServerName serverName = location.getServerName(); 1414 if (serverName == null) { 1415 future 1416 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1417 return; 1418 } 1419 addListener(split(regionInfo, null), (ret, err2) -> { 1420 if (err2 != null) { 1421 future.completeExceptionally(err2); 1422 } else { 1423 future.complete(ret); 1424 } 1425 }); 1426 }); 1427 return future; 1428 } 1429 1430 @Override 1431 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1432 Preconditions.checkNotNull(splitPoint, 1433 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1434 CompletableFuture<Void> future = new CompletableFuture<>(); 1435 addListener(getRegionLocation(regionName), (location, err) -> { 1436 if (err != null) { 1437 future.completeExceptionally(err); 1438 return; 1439 } 1440 RegionInfo regionInfo = location.getRegion(); 1441 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1442 future 1443 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1444 "Replicas are auto-split when their primary is split.")); 1445 return; 1446 } 1447 ServerName serverName = location.getServerName(); 1448 if (serverName == null) { 1449 future 1450 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1451 return; 1452 } 1453 if (regionInfo.getStartKey() != null && 1454 Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) { 1455 future.completeExceptionally( 1456 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1457 return; 1458 } 1459 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1460 if (err2 != null) { 1461 future.completeExceptionally(err2); 1462 } else { 1463 future.complete(ret); 1464 } 1465 }); 1466 }); 1467 return future; 1468 } 1469 1470 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1471 CompletableFuture<Void> future = new CompletableFuture<>(); 1472 TableName tableName = hri.getTable(); 1473 final SplitTableRegionRequest request; 1474 try { 1475 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1476 ng.newNonce()); 1477 } catch (DeserializationException e) { 1478 future.completeExceptionally(e); 1479 return future; 1480 } 1481 1482 addListener( 1483 this.procedureCall(tableName, 1484 request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId, 1485 new SplitTableRegionProcedureBiConsumer(tableName)), 1486 (ret, err2) -> { 1487 if (err2 != null) { 1488 future.completeExceptionally(err2); 1489 } else { 1490 future.complete(ret); 1491 } 1492 }); 1493 return future; 1494 } 1495 1496 @Override 1497 public CompletableFuture<Void> assign(byte[] regionName) { 1498 CompletableFuture<Void> future = new CompletableFuture<>(); 1499 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1500 if (err != null) { 1501 future.completeExceptionally(err); 1502 return; 1503 } 1504 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1505 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1506 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1507 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) 1508 .call(), (ret, err2) -> { 1509 if (err2 != null) { 1510 future.completeExceptionally(err2); 1511 } else { 1512 future.complete(ret); 1513 } 1514 }); 1515 }); 1516 return future; 1517 } 1518 1519 @Override 1520 public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) { 1521 CompletableFuture<Void> future = new CompletableFuture<>(); 1522 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1523 if (err != null) { 1524 future.completeExceptionally(err); 1525 return; 1526 } 1527 addListener( 1528 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1529 .action(((controller, stub) -> this 1530 .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, 1531 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), 1532 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) 1533 .call(), 1534 (ret, err2) -> { 1535 if (err2 != null) { 1536 future.completeExceptionally(err2); 1537 } else { 1538 future.complete(ret); 1539 } 1540 }); 1541 }); 1542 return future; 1543 } 1544 1545 @Override 1546 public CompletableFuture<Void> offline(byte[] regionName) { 1547 CompletableFuture<Void> future = new CompletableFuture<>(); 1548 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1549 if (err != null) { 1550 future.completeExceptionally(err); 1551 return; 1552 } 1553 addListener( 1554 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1555 .action(((controller, stub) -> this 1556 .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub, 1557 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1558 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) 1559 .call(), 1560 (ret, err2) -> { 1561 if (err2 != null) { 1562 future.completeExceptionally(err2); 1563 } else { 1564 future.complete(ret); 1565 } 1566 }); 1567 }); 1568 return future; 1569 } 1570 1571 @Override 1572 public CompletableFuture<Void> move(byte[] regionName) { 1573 CompletableFuture<Void> future = new CompletableFuture<>(); 1574 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1575 if (err != null) { 1576 future.completeExceptionally(err); 1577 return; 1578 } 1579 addListener( 1580 moveRegion(regionInfo, 1581 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1582 (ret, err2) -> { 1583 if (err2 != null) { 1584 future.completeExceptionally(err2); 1585 } else { 1586 future.complete(ret); 1587 } 1588 }); 1589 }); 1590 return future; 1591 } 1592 1593 @Override 1594 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1595 Preconditions.checkNotNull(destServerName, 1596 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1597 CompletableFuture<Void> future = new CompletableFuture<>(); 1598 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1599 if (err != null) { 1600 future.completeExceptionally(err); 1601 return; 1602 } 1603 addListener( 1604 moveRegion(regionInfo, RequestConverter 1605 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1606 (ret, err2) -> { 1607 if (err2 != null) { 1608 future.completeExceptionally(err2); 1609 } else { 1610 future.complete(ret); 1611 } 1612 }); 1613 }); 1614 return future; 1615 } 1616 1617 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1618 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1619 .action( 1620 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1621 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1622 .call(); 1623 } 1624 1625 @Override 1626 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1627 return this 1628 .<Void> newMasterCaller() 1629 .action( 1630 (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1631 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1632 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call(); 1633 } 1634 1635 @Override 1636 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1637 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1638 Scan scan = QuotaTableUtil.makeScan(filter); 1639 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() 1640 .scan(scan, new AdvancedScanResultConsumer() { 1641 List<QuotaSettings> settings = new ArrayList<>(); 1642 1643 @Override 1644 public void onNext(Result[] results, ScanController controller) { 1645 for (Result result : results) { 1646 try { 1647 QuotaTableUtil.parseResultToCollection(result, settings); 1648 } catch (IOException e) { 1649 controller.terminate(); 1650 future.completeExceptionally(e); 1651 } 1652 } 1653 } 1654 1655 @Override 1656 public void onError(Throwable error) { 1657 future.completeExceptionally(error); 1658 } 1659 1660 @Override 1661 public void onComplete() { 1662 future.complete(settings); 1663 } 1664 }); 1665 return future; 1666 } 1667 1668 @Override 1669 public CompletableFuture<Void> addReplicationPeer(String peerId, 1670 ReplicationPeerConfig peerConfig, boolean enabled) { 1671 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1672 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1673 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1674 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1675 } 1676 1677 @Override 1678 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1679 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1680 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1681 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1682 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1683 } 1684 1685 @Override 1686 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1687 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1688 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1689 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1690 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1691 } 1692 1693 @Override 1694 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1695 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1696 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1697 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1698 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1699 } 1700 1701 @Override 1702 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1703 return this 1704 .<ReplicationPeerConfig> newMasterCaller() 1705 .action( 1706 (controller, stub) -> this 1707 .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( 1708 controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( 1709 s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1710 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); 1711 } 1712 1713 @Override 1714 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1715 ReplicationPeerConfig peerConfig) { 1716 return this 1717 .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall( 1718 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1719 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1720 (resp) -> resp.getProcId(), 1721 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1722 } 1723 1724 @Override 1725 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1726 Map<TableName, List<String>> tableCfs) { 1727 if (tableCfs == null) { 1728 return failedFuture(new ReplicationException("tableCfs is null")); 1729 } 1730 1731 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1732 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1733 if (!completeExceptionally(future, error)) { 1734 ReplicationPeerConfig newPeerConfig = 1735 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1736 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1737 if (!completeExceptionally(future, error)) { 1738 future.complete(result); 1739 } 1740 }); 1741 } 1742 }); 1743 return future; 1744 } 1745 1746 @Override 1747 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1748 Map<TableName, List<String>> tableCfs) { 1749 if (tableCfs == null) { 1750 return failedFuture(new ReplicationException("tableCfs is null")); 1751 } 1752 1753 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1754 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1755 if (!completeExceptionally(future, error)) { 1756 ReplicationPeerConfig newPeerConfig = null; 1757 try { 1758 newPeerConfig = ReplicationPeerConfigUtil 1759 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1760 } catch (ReplicationException e) { 1761 future.completeExceptionally(e); 1762 return; 1763 } 1764 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1765 if (!completeExceptionally(future, error)) { 1766 future.complete(result); 1767 } 1768 }); 1769 } 1770 }); 1771 return future; 1772 } 1773 1774 @Override 1775 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1776 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1777 } 1778 1779 @Override 1780 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1781 Preconditions.checkNotNull(pattern, 1782 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1783 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1784 } 1785 1786 private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers( 1787 ListReplicationPeersRequest request) { 1788 return this 1789 .<List<ReplicationPeerDescription>> newMasterCaller() 1790 .action( 1791 (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1792 List<ReplicationPeerDescription>> call(controller, stub, request, 1793 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1794 (resp) -> resp.getPeerDescList().stream() 1795 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1796 .collect(Collectors.toList()))).call(); 1797 } 1798 1799 @Override 1800 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1801 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1802 addListener(listTableDescriptors(), (tables, error) -> { 1803 if (!completeExceptionally(future, error)) { 1804 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1805 tables.forEach(table -> { 1806 Map<String, Integer> cfs = new HashMap<>(); 1807 Stream.of(table.getColumnFamilies()) 1808 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1809 .forEach(column -> { 1810 cfs.put(column.getNameAsString(), column.getScope()); 1811 }); 1812 if (!cfs.isEmpty()) { 1813 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1814 } 1815 }); 1816 future.complete(replicatedTableCFs); 1817 } 1818 }); 1819 return future; 1820 } 1821 1822 @Override 1823 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1824 SnapshotProtos.SnapshotDescription snapshot = 1825 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1826 try { 1827 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1828 } catch (IllegalArgumentException e) { 1829 return failedFuture(e); 1830 } 1831 CompletableFuture<Void> future = new CompletableFuture<>(); 1832 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); 1833 addListener(this.<Long> newMasterCaller() 1834 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, 1835 stub, request, (s, c, req, done) -> s.snapshot(c, req, done), 1836 resp -> resp.getExpectedTimeout())) 1837 .call(), (expectedTimeout, err) -> { 1838 if (err != null) { 1839 future.completeExceptionally(err); 1840 return; 1841 } 1842 TimerTask pollingTask = new TimerTask() { 1843 int tries = 0; 1844 long startTime = EnvironmentEdgeManager.currentTime(); 1845 long endTime = startTime + expectedTimeout; 1846 long maxPauseTime = expectedTimeout / maxAttempts; 1847 1848 @Override 1849 public void run(Timeout timeout) throws Exception { 1850 if (EnvironmentEdgeManager.currentTime() < endTime) { 1851 addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { 1852 if (err2 != null) { 1853 future.completeExceptionally(err2); 1854 } else if (done) { 1855 future.complete(null); 1856 } else { 1857 // retry again after pauseTime. 1858 long pauseTime = 1859 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1860 pauseTime = Math.min(pauseTime, maxPauseTime); 1861 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 1862 TimeUnit.MILLISECONDS); 1863 } 1864 }); 1865 } else { 1866 future.completeExceptionally( 1867 new SnapshotCreationException("Snapshot '" + snapshot.getName() + 1868 "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); 1869 } 1870 } 1871 }; 1872 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1873 }); 1874 return future; 1875 } 1876 1877 @Override 1878 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1879 return this 1880 .<Boolean> newMasterCaller() 1881 .action( 1882 (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call( 1883 controller, 1884 stub, 1885 IsSnapshotDoneRequest.newBuilder() 1886 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 1887 req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call(); 1888 } 1889 1890 @Override 1891 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1892 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1893 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1894 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1895 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1896 } 1897 1898 @Override 1899 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 1900 boolean restoreAcl) { 1901 CompletableFuture<Void> future = new CompletableFuture<>(); 1902 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 1903 if (err != null) { 1904 future.completeExceptionally(err); 1905 return; 1906 } 1907 TableName tableName = null; 1908 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 1909 for (SnapshotDescription snap : snapshotDescriptions) { 1910 if (snap.getName().equals(snapshotName)) { 1911 tableName = snap.getTableName(); 1912 break; 1913 } 1914 } 1915 } 1916 if (tableName == null) { 1917 future.completeExceptionally(new RestoreSnapshotException( 1918 "Unable to find the table name for snapshot=" + snapshotName)); 1919 return; 1920 } 1921 final TableName finalTableName = tableName; 1922 addListener(tableExists(finalTableName), (exists, err2) -> { 1923 if (err2 != null) { 1924 future.completeExceptionally(err2); 1925 } else if (!exists) { 1926 // if table does not exist, then just clone snapshot into new table. 1927 completeConditionalOnFuture(future, 1928 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl)); 1929 } else { 1930 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 1931 if (err4 != null) { 1932 future.completeExceptionally(err4); 1933 } else if (!disabled) { 1934 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 1935 } else { 1936 completeConditionalOnFuture(future, 1937 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 1938 } 1939 }); 1940 } 1941 }); 1942 }); 1943 return future; 1944 } 1945 1946 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 1947 boolean takeFailSafeSnapshot, boolean restoreAcl) { 1948 if (takeFailSafeSnapshot) { 1949 CompletableFuture<Void> future = new CompletableFuture<>(); 1950 // Step.1 Take a snapshot of the current state 1951 String failSafeSnapshotSnapshotNameFormat = 1952 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 1953 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 1954 final String failSafeSnapshotSnapshotName = 1955 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 1956 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 1957 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 1958 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1959 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 1960 if (err != null) { 1961 future.completeExceptionally(err); 1962 } else { 1963 // Step.2 Restore snapshot 1964 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl), 1965 (void2, err2) -> { 1966 if (err2 != null) { 1967 // Step.3.a Something went wrong during the restore and try to rollback. 1968 addListener( 1969 internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl), 1970 (void3, err3) -> { 1971 if (err3 != null) { 1972 future.completeExceptionally(err3); 1973 } else { 1974 String msg = 1975 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + 1976 failSafeSnapshotSnapshotName + " succeeded."; 1977 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 1978 } 1979 }); 1980 } else { 1981 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 1982 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1983 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 1984 if (err3 != null) { 1985 LOG.error( 1986 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 1987 err3); 1988 future.completeExceptionally(err3); 1989 } else { 1990 future.complete(ret3); 1991 } 1992 }); 1993 } 1994 }); 1995 } 1996 }); 1997 return future; 1998 } else { 1999 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl); 2000 } 2001 } 2002 2003 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2004 CompletableFuture<T> parentFuture) { 2005 addListener(parentFuture, (res, err) -> { 2006 if (err != null) { 2007 dependentFuture.completeExceptionally(err); 2008 } else { 2009 dependentFuture.complete(res); 2010 } 2011 }); 2012 } 2013 2014 @Override 2015 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2016 boolean restoreAcl) { 2017 CompletableFuture<Void> future = new CompletableFuture<>(); 2018 addListener(tableExists(tableName), (exists, err) -> { 2019 if (err != null) { 2020 future.completeExceptionally(err); 2021 } else if (exists) { 2022 future.completeExceptionally(new TableExistsException(tableName)); 2023 } else { 2024 completeConditionalOnFuture(future, 2025 internalRestoreSnapshot(snapshotName, tableName, restoreAcl)); 2026 } 2027 }); 2028 return future; 2029 } 2030 2031 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2032 boolean restoreAcl) { 2033 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2034 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2035 try { 2036 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2037 } catch (IllegalArgumentException e) { 2038 return failedFuture(e); 2039 } 2040 return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this 2041 .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub, 2042 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2043 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(), 2044 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2045 .call()); 2046 } 2047 2048 @Override 2049 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2050 return getCompletedSnapshots(null); 2051 } 2052 2053 @Override 2054 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2055 Preconditions.checkNotNull(pattern, 2056 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2057 return getCompletedSnapshots(pattern); 2058 } 2059 2060 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2061 return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this 2062 .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> 2063 call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), 2064 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2065 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2066 .call(); 2067 } 2068 2069 @Override 2070 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2071 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2072 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2073 return getCompletedSnapshots(tableNamePattern, null); 2074 } 2075 2076 @Override 2077 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2078 Pattern snapshotNamePattern) { 2079 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2080 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2081 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2082 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2083 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2084 } 2085 2086 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots( 2087 Pattern tableNamePattern, Pattern snapshotNamePattern) { 2088 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2089 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2090 if (err != null) { 2091 future.completeExceptionally(err); 2092 return; 2093 } 2094 if (tableNames == null || tableNames.size() <= 0) { 2095 future.complete(Collections.emptyList()); 2096 return; 2097 } 2098 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2099 if (err2 != null) { 2100 future.completeExceptionally(err2); 2101 return; 2102 } 2103 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2104 future.complete(Collections.emptyList()); 2105 return; 2106 } 2107 future.complete(snapshotDescList.stream() 2108 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2109 .collect(Collectors.toList())); 2110 }); 2111 }); 2112 return future; 2113 } 2114 2115 @Override 2116 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2117 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2118 } 2119 2120 @Override 2121 public CompletableFuture<Void> deleteSnapshots() { 2122 return internalDeleteSnapshots(null, null); 2123 } 2124 2125 @Override 2126 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2127 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2128 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2129 return internalDeleteSnapshots(null, snapshotNamePattern); 2130 } 2131 2132 @Override 2133 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2134 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2135 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2136 return internalDeleteSnapshots(tableNamePattern, null); 2137 } 2138 2139 @Override 2140 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2141 Pattern snapshotNamePattern) { 2142 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2143 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2144 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2145 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2146 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2147 } 2148 2149 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2150 Pattern snapshotNamePattern) { 2151 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2152 if (tableNamePattern == null) { 2153 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2154 } else { 2155 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2156 } 2157 CompletableFuture<Void> future = new CompletableFuture<>(); 2158 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> { 2159 if (err != null) { 2160 future.completeExceptionally(err); 2161 return; 2162 } 2163 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2164 future.complete(null); 2165 return; 2166 } 2167 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2168 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2169 if (e != null) { 2170 future.completeExceptionally(e); 2171 } else { 2172 future.complete(v); 2173 } 2174 }); 2175 })); 2176 return future; 2177 } 2178 2179 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2180 return this 2181 .<Void> newMasterCaller() 2182 .action( 2183 (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2184 controller, 2185 stub, 2186 DeleteSnapshotRequest.newBuilder() 2187 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 2188 req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call(); 2189 } 2190 2191 @Override 2192 public CompletableFuture<Void> execProcedure(String signature, String instance, 2193 Map<String, String> props) { 2194 CompletableFuture<Void> future = new CompletableFuture<>(); 2195 ProcedureDescription procDesc = 2196 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2197 addListener(this.<Long> newMasterCaller() 2198 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2199 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2200 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2201 .call(), (expectedTimeout, err) -> { 2202 if (err != null) { 2203 future.completeExceptionally(err); 2204 return; 2205 } 2206 TimerTask pollingTask = new TimerTask() { 2207 int tries = 0; 2208 long startTime = EnvironmentEdgeManager.currentTime(); 2209 long endTime = startTime + expectedTimeout; 2210 long maxPauseTime = expectedTimeout / maxAttempts; 2211 2212 @Override 2213 public void run(Timeout timeout) throws Exception { 2214 if (EnvironmentEdgeManager.currentTime() < endTime) { 2215 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2216 if (err2 != null) { 2217 future.completeExceptionally(err2); 2218 return; 2219 } 2220 if (done) { 2221 future.complete(null); 2222 } else { 2223 // retry again after pauseTime. 2224 long pauseTime = 2225 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2226 pauseTime = Math.min(pauseTime, maxPauseTime); 2227 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2228 TimeUnit.MICROSECONDS); 2229 } 2230 }); 2231 } else { 2232 future.completeExceptionally(new IOException("Procedure '" + signature + " : " + 2233 instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2234 } 2235 } 2236 }; 2237 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2238 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2239 }); 2240 return future; 2241 } 2242 2243 @Override 2244 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2245 Map<String, String> props) { 2246 ProcedureDescription proDesc = 2247 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2248 return this.<byte[]> newMasterCaller() 2249 .action( 2250 (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2251 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2252 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2253 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2254 .call(); 2255 } 2256 2257 @Override 2258 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2259 Map<String, String> props) { 2260 ProcedureDescription proDesc = 2261 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2262 return this.<Boolean> newMasterCaller() 2263 .action((controller, stub) -> this 2264 .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub, 2265 IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2266 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2267 .call(); 2268 } 2269 2270 @Override 2271 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2272 return this.<Boolean> newMasterCaller().action( 2273 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2274 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2275 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2276 .call(); 2277 } 2278 2279 @Override 2280 public CompletableFuture<String> getProcedures() { 2281 return this 2282 .<String> newMasterCaller() 2283 .action( 2284 (controller, stub) -> this 2285 .<GetProceduresRequest, GetProceduresResponse, String> call( 2286 controller, stub, GetProceduresRequest.newBuilder().build(), 2287 (s, c, req, done) -> s.getProcedures(c, req, done), 2288 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call(); 2289 } 2290 2291 @Override 2292 public CompletableFuture<String> getLocks() { 2293 return this 2294 .<String> newMasterCaller() 2295 .action( 2296 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call( 2297 controller, stub, GetLocksRequest.newBuilder().build(), 2298 (s, c, req, done) -> s.getLocks(c, req, done), 2299 resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call(); 2300 } 2301 2302 @Override 2303 public CompletableFuture<Void> decommissionRegionServers( 2304 List<ServerName> servers, boolean offload) { 2305 return this.<Void> newMasterCaller() 2306 .action((controller, stub) -> this 2307 .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call( 2308 controller, stub, 2309 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2310 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2311 .call(); 2312 } 2313 2314 @Override 2315 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2316 return this.<List<ServerName>> newMasterCaller() 2317 .action((controller, stub) -> this 2318 .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse, 2319 List<ServerName>> call( 2320 controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(), 2321 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2322 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2323 .collect(Collectors.toList()))) 2324 .call(); 2325 } 2326 2327 @Override 2328 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2329 List<byte[]> encodedRegionNames) { 2330 return this.<Void> newMasterCaller() 2331 .action((controller, stub) -> 2332 this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call( 2333 controller, stub, RequestConverter.buildRecommissionRegionServerRequest( 2334 server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer( 2335 c, req, done), resp -> null)).call(); 2336 } 2337 2338 /** 2339 * Get the region location for the passed region name. The region name may be a full region name 2340 * or encoded region name. If the region does not found, then it'll throw an 2341 * UnknownRegionException wrapped by a {@link CompletableFuture} 2342 * @param regionNameOrEncodedRegionName region name or encoded region name 2343 * @return region location, wrapped by a {@link CompletableFuture} 2344 */ 2345 @VisibleForTesting 2346 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2347 if (regionNameOrEncodedRegionName == null) { 2348 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2349 } 2350 try { 2351 CompletableFuture<Optional<HRegionLocation>> future; 2352 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2353 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2354 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2355 // old format encodedName, should be meta region 2356 future = connection.registry.getMetaRegionLocations() 2357 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2358 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2359 } else { 2360 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2361 regionNameOrEncodedRegionName); 2362 } 2363 } else { 2364 RegionInfo regionInfo = 2365 MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2366 if (regionInfo.isMetaRegion()) { 2367 future = connection.registry.getMetaRegionLocations() 2368 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2369 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2370 .findFirst()); 2371 } else { 2372 future = 2373 AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2374 } 2375 } 2376 2377 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2378 addListener(future, (location, err) -> { 2379 if (err != null) { 2380 returnedFuture.completeExceptionally(err); 2381 return; 2382 } 2383 if (!location.isPresent() || location.get().getRegion() == null) { 2384 returnedFuture.completeExceptionally( 2385 new UnknownRegionException("Invalid region name or encoded region name: " + 2386 Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2387 } else { 2388 returnedFuture.complete(location.get()); 2389 } 2390 }); 2391 return returnedFuture; 2392 } catch (IOException e) { 2393 return failedFuture(e); 2394 } 2395 } 2396 2397 /** 2398 * Get the region info for the passed region name. The region name may be a full region name or 2399 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2400 * wrapped by a {@link CompletableFuture} 2401 * @return region info, wrapped by a {@link CompletableFuture} 2402 */ 2403 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2404 if (regionNameOrEncodedRegionName == null) { 2405 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2406 } 2407 2408 if (Bytes.equals(regionNameOrEncodedRegionName, 2409 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) || 2410 Bytes.equals(regionNameOrEncodedRegionName, 2411 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { 2412 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2413 } 2414 2415 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2416 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2417 if (err != null) { 2418 future.completeExceptionally(err); 2419 } else { 2420 future.complete(location.getRegion()); 2421 } 2422 }); 2423 return future; 2424 } 2425 2426 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2427 if (numRegions < 3) { 2428 throw new IllegalArgumentException("Must create at least three regions"); 2429 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2430 throw new IllegalArgumentException("Start key must be smaller than end key"); 2431 } 2432 if (numRegions == 3) { 2433 return new byte[][] { startKey, endKey }; 2434 } 2435 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2436 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2437 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2438 } 2439 return splitKeys; 2440 } 2441 2442 private void verifySplitKeys(byte[][] splitKeys) { 2443 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2444 // Verify there are no duplicate split keys 2445 byte[] lastKey = null; 2446 for (byte[] splitKey : splitKeys) { 2447 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2448 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2449 } 2450 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2451 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2452 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2453 } 2454 lastKey = splitKey; 2455 } 2456 } 2457 2458 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2459 2460 abstract void onFinished(); 2461 2462 abstract void onError(Throwable error); 2463 2464 @Override 2465 public void accept(Void v, Throwable error) { 2466 if (error != null) { 2467 onError(error); 2468 return; 2469 } 2470 onFinished(); 2471 } 2472 } 2473 2474 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2475 protected final TableName tableName; 2476 2477 TableProcedureBiConsumer(TableName tableName) { 2478 this.tableName = tableName; 2479 } 2480 2481 abstract String getOperationType(); 2482 2483 String getDescription() { 2484 return "Operation: " + getOperationType() + ", " + "Table Name: " 2485 + tableName.getNameWithNamespaceInclAsString(); 2486 } 2487 2488 @Override 2489 void onFinished() { 2490 LOG.info(getDescription() + " completed"); 2491 } 2492 2493 @Override 2494 void onError(Throwable error) { 2495 LOG.info(getDescription() + " failed with " + error.getMessage()); 2496 } 2497 } 2498 2499 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2500 protected final String namespaceName; 2501 2502 NamespaceProcedureBiConsumer(String namespaceName) { 2503 this.namespaceName = namespaceName; 2504 } 2505 2506 abstract String getOperationType(); 2507 2508 String getDescription() { 2509 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2510 } 2511 2512 @Override 2513 void onFinished() { 2514 LOG.info(getDescription() + " completed"); 2515 } 2516 2517 @Override 2518 void onError(Throwable error) { 2519 LOG.info(getDescription() + " failed with " + error.getMessage()); 2520 } 2521 } 2522 2523 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2524 2525 CreateTableProcedureBiConsumer(TableName tableName) { 2526 super(tableName); 2527 } 2528 2529 @Override 2530 String getOperationType() { 2531 return "CREATE"; 2532 } 2533 } 2534 2535 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2536 2537 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2538 super(tableName); 2539 } 2540 2541 @Override 2542 String getOperationType() { 2543 return "ENABLE"; 2544 } 2545 } 2546 2547 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2548 2549 DeleteTableProcedureBiConsumer(TableName tableName) { 2550 super(tableName); 2551 } 2552 2553 @Override 2554 String getOperationType() { 2555 return "DELETE"; 2556 } 2557 2558 @Override 2559 void onFinished() { 2560 connection.getLocator().clearCache(this.tableName); 2561 super.onFinished(); 2562 } 2563 } 2564 2565 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2566 2567 TruncateTableProcedureBiConsumer(TableName tableName) { 2568 super(tableName); 2569 } 2570 2571 @Override 2572 String getOperationType() { 2573 return "TRUNCATE"; 2574 } 2575 } 2576 2577 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2578 2579 EnableTableProcedureBiConsumer(TableName tableName) { 2580 super(tableName); 2581 } 2582 2583 @Override 2584 String getOperationType() { 2585 return "ENABLE"; 2586 } 2587 } 2588 2589 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2590 2591 DisableTableProcedureBiConsumer(TableName tableName) { 2592 super(tableName); 2593 } 2594 2595 @Override 2596 String getOperationType() { 2597 return "DISABLE"; 2598 } 2599 } 2600 2601 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2602 2603 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2604 super(tableName); 2605 } 2606 2607 @Override 2608 String getOperationType() { 2609 return "ADD_COLUMN_FAMILY"; 2610 } 2611 } 2612 2613 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2614 2615 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2616 super(tableName); 2617 } 2618 2619 @Override 2620 String getOperationType() { 2621 return "DELETE_COLUMN_FAMILY"; 2622 } 2623 } 2624 2625 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2626 2627 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2628 super(tableName); 2629 } 2630 2631 @Override 2632 String getOperationType() { 2633 return "MODIFY_COLUMN_FAMILY"; 2634 } 2635 } 2636 2637 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2638 2639 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2640 super(namespaceName); 2641 } 2642 2643 @Override 2644 String getOperationType() { 2645 return "CREATE_NAMESPACE"; 2646 } 2647 } 2648 2649 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2650 2651 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2652 super(namespaceName); 2653 } 2654 2655 @Override 2656 String getOperationType() { 2657 return "DELETE_NAMESPACE"; 2658 } 2659 } 2660 2661 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2662 2663 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2664 super(namespaceName); 2665 } 2666 2667 @Override 2668 String getOperationType() { 2669 return "MODIFY_NAMESPACE"; 2670 } 2671 } 2672 2673 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2674 2675 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2676 super(tableName); 2677 } 2678 2679 @Override 2680 String getOperationType() { 2681 return "MERGE_REGIONS"; 2682 } 2683 } 2684 2685 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2686 2687 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2688 super(tableName); 2689 } 2690 2691 @Override 2692 String getOperationType() { 2693 return "SPLIT_REGION"; 2694 } 2695 } 2696 2697 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2698 private final String peerId; 2699 private final Supplier<String> getOperation; 2700 2701 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2702 this.peerId = peerId; 2703 this.getOperation = getOperation; 2704 } 2705 2706 String getDescription() { 2707 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2708 } 2709 2710 @Override 2711 void onFinished() { 2712 LOG.info(getDescription() + " completed"); 2713 } 2714 2715 @Override 2716 void onError(Throwable error) { 2717 LOG.info(getDescription() + " failed with " + error.getMessage()); 2718 } 2719 } 2720 2721 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2722 CompletableFuture<Void> future = new CompletableFuture<>(); 2723 addListener(procFuture, (procId, error) -> { 2724 if (error != null) { 2725 future.completeExceptionally(error); 2726 return; 2727 } 2728 getProcedureResult(procId, future, 0); 2729 }); 2730 return future; 2731 } 2732 2733 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2734 addListener( 2735 this.<GetProcedureResultResponse> newMasterCaller() 2736 .action((controller, stub) -> this 2737 .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( 2738 controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2739 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2740 .call(), 2741 (response, error) -> { 2742 if (error != null) { 2743 LOG.warn("failed to get the procedure result procId={}", procId, 2744 ConnectionUtils.translateException(error)); 2745 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2746 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2747 return; 2748 } 2749 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2750 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2751 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2752 return; 2753 } 2754 if (response.hasException()) { 2755 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2756 future.completeExceptionally(ioe); 2757 } else { 2758 future.complete(null); 2759 } 2760 }); 2761 } 2762 2763 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2764 CompletableFuture<T> future = new CompletableFuture<>(); 2765 future.completeExceptionally(error); 2766 return future; 2767 } 2768 2769 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2770 if (error != null) { 2771 future.completeExceptionally(error); 2772 return true; 2773 } 2774 return false; 2775 } 2776 2777 @Override 2778 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2779 return getClusterMetrics(EnumSet.allOf(Option.class)); 2780 } 2781 2782 @Override 2783 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2784 return this 2785 .<ClusterMetrics> newMasterCaller() 2786 .action( 2787 (controller, stub) -> this 2788 .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller, 2789 stub, RequestConverter.buildGetClusterStatusRequest(options), 2790 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2791 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call(); 2792 } 2793 2794 @Override 2795 public CompletableFuture<Void> shutdown() { 2796 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2797 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2798 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 2799 resp -> null)) 2800 .call(); 2801 } 2802 2803 @Override 2804 public CompletableFuture<Void> stopMaster() { 2805 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2806 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 2807 controller, stub, StopMasterRequest.newBuilder().build(), 2808 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 2809 .call(); 2810 } 2811 2812 @Override 2813 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2814 StopServerRequest request = RequestConverter 2815 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 2816 return this.<Void> newAdminCaller().priority(HIGH_QOS) 2817 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2818 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2819 resp -> null)) 2820 .serverName(serverName).call(); 2821 } 2822 2823 @Override 2824 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2825 return this 2826 .<Void> newAdminCaller() 2827 .action( 2828 (controller, stub) -> this 2829 .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall( 2830 controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2831 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2832 .serverName(serverName).call(); 2833 } 2834 2835 @Override 2836 public CompletableFuture<Void> updateConfiguration() { 2837 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2838 addListener( 2839 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 2840 (status, err) -> { 2841 if (err != null) { 2842 future.completeExceptionally(err); 2843 } else { 2844 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2845 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 2846 futures.add(updateConfiguration(status.getMasterName())); 2847 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2848 addListener( 2849 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2850 (result, err2) -> { 2851 if (err2 != null) { 2852 future.completeExceptionally(err2); 2853 } else { 2854 future.complete(result); 2855 } 2856 }); 2857 } 2858 }); 2859 return future; 2860 } 2861 2862 @Override 2863 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 2864 return this 2865 .<Void> newAdminCaller() 2866 .action( 2867 (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall( 2868 controller, stub, RequestConverter.buildRollWALWriterRequest(), 2869 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 2870 .serverName(serverName).call(); 2871 } 2872 2873 @Override 2874 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 2875 return this 2876 .<Void> newAdminCaller() 2877 .action( 2878 (controller, stub) -> this 2879 .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall( 2880 controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s, 2881 c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 2882 .serverName(serverName).call(); 2883 } 2884 2885 @Override 2886 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 2887 return this 2888 .<List<SecurityCapability>> newMasterCaller() 2889 .action( 2890 (controller, stub) -> this 2891 .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> 2892 call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), 2893 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 2894 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 2895 .call(); 2896 } 2897 2898 @Override 2899 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 2900 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 2901 } 2902 2903 @Override 2904 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 2905 TableName tableName) { 2906 Preconditions.checkNotNull(tableName, 2907 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 2908 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 2909 } 2910 2911 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 2912 ServerName serverName) { 2913 return this.<List<RegionMetrics>> newAdminCaller() 2914 .action((controller, stub) -> this 2915 .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>> 2916 adminCall(controller, stub, request, (s, c, req, done) -> 2917 s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics)) 2918 .serverName(serverName).call(); 2919 } 2920 2921 @Override 2922 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 2923 return this 2924 .<Boolean> newMasterCaller() 2925 .action( 2926 (controller, stub) -> this 2927 .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller, 2928 stub, IsInMaintenanceModeRequest.newBuilder().build(), 2929 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 2930 resp -> resp.getInMaintenanceMode())).call(); 2931 } 2932 2933 @Override 2934 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 2935 CompactType compactType) { 2936 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2937 2938 switch (compactType) { 2939 case MOB: 2940 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 2941 if (err != null) { 2942 future.completeExceptionally(err); 2943 return; 2944 } 2945 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 2946 2947 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 2948 .action((controller, stub) -> this 2949 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 2950 controller, stub, 2951 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 2952 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2953 .call(), (resp2, err2) -> { 2954 if (err2 != null) { 2955 future.completeExceptionally(err2); 2956 } else { 2957 if (resp2.hasCompactionState()) { 2958 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 2959 } else { 2960 future.complete(CompactionState.NONE); 2961 } 2962 } 2963 }); 2964 }); 2965 break; 2966 case NORMAL: 2967 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 2968 if (err != null) { 2969 future.completeExceptionally(err); 2970 return; 2971 } 2972 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 2973 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 2974 locations.stream().filter(loc -> loc.getServerName() != null) 2975 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 2976 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 2977 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 2978 // If any region compaction state is MAJOR_AND_MINOR 2979 // the table compaction state is MAJOR_AND_MINOR, too. 2980 if (err2 != null) { 2981 future.completeExceptionally(unwrapCompletionException(err2)); 2982 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 2983 future.complete(regionState); 2984 } else { 2985 regionStates.add(regionState); 2986 } 2987 })); 2988 }); 2989 addListener( 2990 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2991 (ret, err3) -> { 2992 // If future not completed, check all regions's compaction state 2993 if (!future.isCompletedExceptionally() && !future.isDone()) { 2994 CompactionState state = CompactionState.NONE; 2995 for (CompactionState regionState : regionStates) { 2996 switch (regionState) { 2997 case MAJOR: 2998 if (state == CompactionState.MINOR) { 2999 future.complete(CompactionState.MAJOR_AND_MINOR); 3000 } else { 3001 state = CompactionState.MAJOR; 3002 } 3003 break; 3004 case MINOR: 3005 if (state == CompactionState.MAJOR) { 3006 future.complete(CompactionState.MAJOR_AND_MINOR); 3007 } else { 3008 state = CompactionState.MINOR; 3009 } 3010 break; 3011 case NONE: 3012 default: 3013 } 3014 } 3015 if (!future.isDone()) { 3016 future.complete(state); 3017 } 3018 } 3019 }); 3020 }); 3021 break; 3022 default: 3023 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3024 } 3025 3026 return future; 3027 } 3028 3029 @Override 3030 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3031 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3032 addListener(getRegionLocation(regionName), (location, err) -> { 3033 if (err != null) { 3034 future.completeExceptionally(err); 3035 return; 3036 } 3037 ServerName serverName = location.getServerName(); 3038 if (serverName == null) { 3039 future 3040 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3041 return; 3042 } 3043 addListener( 3044 this.<GetRegionInfoResponse> newAdminCaller() 3045 .action((controller, stub) -> this 3046 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 3047 controller, stub, 3048 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3049 true), 3050 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3051 .serverName(serverName).call(), 3052 (resp2, err2) -> { 3053 if (err2 != null) { 3054 future.completeExceptionally(err2); 3055 } else { 3056 if (resp2.hasCompactionState()) { 3057 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3058 } else { 3059 future.complete(CompactionState.NONE); 3060 } 3061 } 3062 }); 3063 }); 3064 return future; 3065 } 3066 3067 @Override 3068 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3069 MajorCompactionTimestampRequest request = 3070 MajorCompactionTimestampRequest.newBuilder() 3071 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3072 return this.<Optional<Long>> newMasterCaller().action((controller, stub) -> 3073 this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> 3074 call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp( 3075 c, req, done), ProtobufUtil::toOptionalTimestamp)).call(); 3076 } 3077 3078 @Override 3079 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion( 3080 byte[] regionName) { 3081 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3082 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3083 addListener(getRegionInfo(regionName), (region, err) -> { 3084 if (err != null) { 3085 future.completeExceptionally(err); 3086 return; 3087 } 3088 MajorCompactionTimestampForRegionRequest.Builder builder = 3089 MajorCompactionTimestampForRegionRequest.newBuilder(); 3090 builder.setRegion( 3091 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3092 addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this 3093 .<MajorCompactionTimestampForRegionRequest, 3094 MajorCompactionTimestampResponse, Optional<Long>> call( 3095 controller, stub, builder.build(), 3096 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3097 ProtobufUtil::toOptionalTimestamp)) 3098 .call(), (timestamp, err2) -> { 3099 if (err2 != null) { 3100 future.completeExceptionally(err2); 3101 } else { 3102 future.complete(timestamp); 3103 } 3104 }); 3105 }); 3106 return future; 3107 } 3108 3109 @Override 3110 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3111 List<String> serverNamesList) { 3112 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3113 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3114 if (err != null) { 3115 future.completeExceptionally(err); 3116 return; 3117 } 3118 // Accessed by multiple threads. 3119 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3120 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3121 serverNames.stream().forEach(serverName -> { 3122 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3123 if (err2 != null) { 3124 future.completeExceptionally(unwrapCompletionException(err2)); 3125 } else { 3126 serverStates.put(serverName, serverState); 3127 } 3128 })); 3129 }); 3130 addListener( 3131 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3132 (ret, err3) -> { 3133 if (!future.isCompletedExceptionally()) { 3134 if (err3 != null) { 3135 future.completeExceptionally(err3); 3136 } else { 3137 future.complete(serverStates); 3138 } 3139 } 3140 }); 3141 }); 3142 return future; 3143 } 3144 3145 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3146 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3147 if (serverNamesList.isEmpty()) { 3148 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3149 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3150 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3151 if (err != null) { 3152 future.completeExceptionally(err); 3153 } else { 3154 future.complete(clusterMetrics.getServersName()); 3155 } 3156 }); 3157 return future; 3158 } else { 3159 List<ServerName> serverList = new ArrayList<>(); 3160 for (String regionServerName : serverNamesList) { 3161 ServerName serverName = null; 3162 try { 3163 serverName = ServerName.valueOf(regionServerName); 3164 } catch (Exception e) { 3165 future.completeExceptionally( 3166 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3167 } 3168 if (serverName == null) { 3169 future.completeExceptionally( 3170 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3171 } else { 3172 serverList.add(serverName); 3173 } 3174 } 3175 future.complete(serverList); 3176 } 3177 return future; 3178 } 3179 3180 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3181 return this 3182 .<Boolean>newAdminCaller() 3183 .serverName(serverName) 3184 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3185 Boolean>adminCall(controller, stub, 3186 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) -> 3187 s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call(); 3188 } 3189 3190 @Override 3191 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3192 return this.<Boolean> newMasterCaller() 3193 .action((controller, stub) -> this 3194 .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub, 3195 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3196 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3197 (resp) -> resp.getPrevBalanceValue())) 3198 .call(); 3199 } 3200 3201 @Override 3202 public CompletableFuture<Boolean> balance(boolean forcible) { 3203 return this 3204 .<Boolean> newMasterCaller() 3205 .action( 3206 (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, 3207 stub, RequestConverter.buildBalanceRequest(forcible), 3208 (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); 3209 } 3210 3211 @Override 3212 public CompletableFuture<Boolean> isBalancerEnabled() { 3213 return this 3214 .<Boolean> newMasterCaller() 3215 .action((controller, stub) -> 3216 this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller, 3217 stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done) 3218 -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call(); 3219 } 3220 3221 @Override 3222 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3223 return this 3224 .<Boolean> newMasterCaller() 3225 .action( 3226 (controller, stub) -> this 3227 .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call( 3228 controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c, 3229 req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp 3230 .getPrevNormalizerValue())).call(); 3231 } 3232 3233 @Override 3234 public CompletableFuture<Boolean> isNormalizerEnabled() { 3235 return this 3236 .<Boolean> newMasterCaller() 3237 .action( 3238 (controller, stub) -> this 3239 .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller, 3240 stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3241 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), 3242 (resp) -> resp.getEnabled())).call(); 3243 } 3244 3245 @Override 3246 public CompletableFuture<Boolean> normalize() { 3247 return this 3248 .<Boolean> newMasterCaller() 3249 .action( 3250 (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call( 3251 controller, stub, RequestConverter.buildNormalizeRequest(), 3252 (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan())) 3253 .call(); 3254 } 3255 3256 @Override 3257 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3258 return this 3259 .<Boolean> newMasterCaller() 3260 .action( 3261 (controller, stub) -> this 3262 .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call( 3263 controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s, 3264 c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp 3265 .getPrevValue())).call(); 3266 } 3267 3268 @Override 3269 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3270 return this 3271 .<Boolean> newMasterCaller() 3272 .action( 3273 (controller, stub) -> this 3274 .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call( 3275 controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req, 3276 done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3277 .call(); 3278 } 3279 3280 @Override 3281 public CompletableFuture<Boolean> runCleanerChore() { 3282 return this 3283 .<Boolean> newMasterCaller() 3284 .action( 3285 (controller, stub) -> this 3286 .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub, 3287 RequestConverter.buildRunCleanerChoreRequest(), 3288 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3289 (resp) -> resp.getCleanerChoreRan())).call(); 3290 } 3291 3292 @Override 3293 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3294 return this 3295 .<Boolean> newMasterCaller() 3296 .action( 3297 (controller, stub) -> this 3298 .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call( 3299 controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s, 3300 c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp 3301 .getPrevValue())).call(); 3302 } 3303 3304 @Override 3305 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3306 return this 3307 .<Boolean> newMasterCaller() 3308 .action( 3309 (controller, stub) -> this 3310 .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call( 3311 controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c, 3312 req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp 3313 .getValue())).call(); 3314 } 3315 3316 @Override 3317 public CompletableFuture<Integer> runCatalogJanitor() { 3318 return this 3319 .<Integer> newMasterCaller() 3320 .action( 3321 (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call( 3322 controller, stub, RequestConverter.buildCatalogScanRequest(), 3323 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3324 .call(); 3325 } 3326 3327 @Override 3328 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3329 ServiceCaller<S, R> callable) { 3330 MasterCoprocessorRpcChannelImpl channel = 3331 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3332 S stub = stubMaker.apply(channel); 3333 CompletableFuture<R> future = new CompletableFuture<>(); 3334 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3335 callable.call(stub, controller, resp -> { 3336 if (controller.failed()) { 3337 future.completeExceptionally(controller.getFailed()); 3338 } else { 3339 future.complete(resp); 3340 } 3341 }); 3342 return future; 3343 } 3344 3345 @Override 3346 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3347 ServiceCaller<S, R> callable, ServerName serverName) { 3348 RegionServerCoprocessorRpcChannelImpl channel = 3349 new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName( 3350 serverName)); 3351 S stub = stubMaker.apply(channel); 3352 CompletableFuture<R> future = new CompletableFuture<>(); 3353 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3354 callable.call(stub, controller, resp -> { 3355 if (controller.failed()) { 3356 future.completeExceptionally(controller.getFailed()); 3357 } else { 3358 future.complete(resp); 3359 } 3360 }); 3361 return future; 3362 } 3363 3364 @Override 3365 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3366 return this.<List<ServerName>> newMasterCaller() 3367 .action((controller, stub) -> this 3368 .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call( 3369 controller, stub, RequestConverter.buildClearDeadServersRequest(servers), 3370 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3371 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3372 .call(); 3373 } 3374 3375 private <T> ServerRequestCallerBuilder<T> newServerCaller() { 3376 return this.connection.callerFactory.<T> serverRequest() 3377 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3378 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3379 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 3380 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3381 } 3382 3383 @Override 3384 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3385 if (tableName == null) { 3386 return failedFuture(new IllegalArgumentException("Table name is null")); 3387 } 3388 CompletableFuture<Void> future = new CompletableFuture<>(); 3389 addListener(tableExists(tableName), (exist, err) -> { 3390 if (err != null) { 3391 future.completeExceptionally(err); 3392 return; 3393 } 3394 if (!exist) { 3395 future.completeExceptionally(new TableNotFoundException( 3396 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3397 return; 3398 } 3399 addListener(getTableSplits(tableName), (splits, err1) -> { 3400 if (err1 != null) { 3401 future.completeExceptionally(err1); 3402 } else { 3403 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3404 if (err2 != null) { 3405 future.completeExceptionally(err2); 3406 } else { 3407 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3408 if (err3 != null) { 3409 future.completeExceptionally(err3); 3410 } else { 3411 future.complete(result3); 3412 } 3413 }); 3414 } 3415 }); 3416 } 3417 }); 3418 }); 3419 return future; 3420 } 3421 3422 @Override 3423 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3424 if (tableName == null) { 3425 return failedFuture(new IllegalArgumentException("Table name is null")); 3426 } 3427 CompletableFuture<Void> future = new CompletableFuture<>(); 3428 addListener(tableExists(tableName), (exist, err) -> { 3429 if (err != null) { 3430 future.completeExceptionally(err); 3431 return; 3432 } 3433 if (!exist) { 3434 future.completeExceptionally(new TableNotFoundException( 3435 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3436 return; 3437 } 3438 addListener(setTableReplication(tableName, false), (result, err2) -> { 3439 if (err2 != null) { 3440 future.completeExceptionally(err2); 3441 } else { 3442 future.complete(result); 3443 } 3444 }); 3445 }); 3446 return future; 3447 } 3448 3449 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3450 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3451 addListener( 3452 getRegions(tableName).thenApply(regions -> regions.stream() 3453 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3454 (regions, err2) -> { 3455 if (err2 != null) { 3456 future.completeExceptionally(err2); 3457 return; 3458 } 3459 if (regions.size() == 1) { 3460 future.complete(null); 3461 } else { 3462 byte[][] splits = new byte[regions.size() - 1][]; 3463 for (int i = 1; i < regions.size(); i++) { 3464 splits[i - 1] = regions.get(i).getStartKey(); 3465 } 3466 future.complete(splits); 3467 } 3468 }); 3469 return future; 3470 } 3471 3472 /** 3473 * Connect to peer and check the table descriptor on peer: 3474 * <ol> 3475 * <li>Create the same table on peer when not exist.</li> 3476 * <li>Throw an exception if the table already has replication enabled on any of the column 3477 * families.</li> 3478 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3479 * </ol> 3480 * @param tableName name of the table to sync to the peer 3481 * @param splits table split keys 3482 */ 3483 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3484 byte[][] splits) { 3485 CompletableFuture<Void> future = new CompletableFuture<>(); 3486 addListener(listReplicationPeers(), (peers, err) -> { 3487 if (err != null) { 3488 future.completeExceptionally(err); 3489 return; 3490 } 3491 if (peers == null || peers.size() <= 0) { 3492 future.completeExceptionally( 3493 new IllegalArgumentException("Found no peer cluster for replication.")); 3494 return; 3495 } 3496 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3497 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3498 .forEach(peer -> { 3499 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3500 }); 3501 addListener( 3502 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3503 (result, err2) -> { 3504 if (err2 != null) { 3505 future.completeExceptionally(err2); 3506 } else { 3507 future.complete(result); 3508 } 3509 }); 3510 }); 3511 return future; 3512 } 3513 3514 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3515 ReplicationPeerDescription peer) { 3516 Configuration peerConf = null; 3517 try { 3518 peerConf = 3519 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3520 } catch (IOException e) { 3521 return failedFuture(e); 3522 } 3523 CompletableFuture<Void> future = new CompletableFuture<>(); 3524 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3525 if (err != null) { 3526 future.completeExceptionally(err); 3527 return; 3528 } 3529 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3530 if (err1 != null) { 3531 future.completeExceptionally(err1); 3532 return; 3533 } 3534 AsyncAdmin peerAdmin = conn.getAdmin(); 3535 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3536 if (err2 != null) { 3537 future.completeExceptionally(err2); 3538 return; 3539 } 3540 if (!exist) { 3541 CompletableFuture<Void> createTableFuture = null; 3542 if (splits == null) { 3543 createTableFuture = peerAdmin.createTable(tableDesc); 3544 } else { 3545 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3546 } 3547 addListener(createTableFuture, (result, err3) -> { 3548 if (err3 != null) { 3549 future.completeExceptionally(err3); 3550 } else { 3551 future.complete(result); 3552 } 3553 }); 3554 } else { 3555 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3556 (result, err4) -> { 3557 if (err4 != null) { 3558 future.completeExceptionally(err4); 3559 } else { 3560 future.complete(result); 3561 } 3562 }); 3563 } 3564 }); 3565 }); 3566 }); 3567 return future; 3568 } 3569 3570 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3571 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3572 CompletableFuture<Void> future = new CompletableFuture<>(); 3573 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3574 if (err != null) { 3575 future.completeExceptionally(err); 3576 return; 3577 } 3578 if (peerTableDesc == null) { 3579 future.completeExceptionally( 3580 new IllegalArgumentException("Failed to get table descriptor for table " + 3581 tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3582 return; 3583 } 3584 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3585 future.completeExceptionally(new IllegalArgumentException( 3586 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + 3587 ", but the table descriptors are not same when compared with source cluster." + 3588 " Thus can not enable the table's replication switch.")); 3589 return; 3590 } 3591 future.complete(null); 3592 }); 3593 return future; 3594 } 3595 3596 /** 3597 * Set the table's replication switch if the table's replication switch is already not set. 3598 * @param tableName name of the table 3599 * @param enableRep is replication switch enable or disable 3600 */ 3601 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3602 CompletableFuture<Void> future = new CompletableFuture<>(); 3603 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3604 if (err != null) { 3605 future.completeExceptionally(err); 3606 return; 3607 } 3608 if (!tableDesc.matchReplicationScope(enableRep)) { 3609 int scope = 3610 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3611 TableDescriptor newTableDesc = 3612 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3613 addListener(modifyTable(newTableDesc), (result, err2) -> { 3614 if (err2 != null) { 3615 future.completeExceptionally(err2); 3616 } else { 3617 future.complete(result); 3618 } 3619 }); 3620 } else { 3621 future.complete(null); 3622 } 3623 }); 3624 return future; 3625 } 3626 3627 @Override 3628 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3629 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3630 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3631 if (err != null) { 3632 future.completeExceptionally(err); 3633 return; 3634 } 3635 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3636 locations.stream().filter(l -> l.getRegion() != null) 3637 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3638 .collect(Collectors.groupingBy(l -> l.getServerName(), 3639 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3640 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3641 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3642 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3643 futures 3644 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3645 if (err2 != null) { 3646 future.completeExceptionally(unwrapCompletionException(err2)); 3647 } else { 3648 aggregator.append(stats); 3649 } 3650 })); 3651 } 3652 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3653 (ret, err3) -> { 3654 if (err3 != null) { 3655 future.completeExceptionally(unwrapCompletionException(err3)); 3656 } else { 3657 future.complete(aggregator.sum()); 3658 } 3659 }); 3660 }); 3661 return future; 3662 } 3663 3664 @Override 3665 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3666 boolean preserveSplits) { 3667 CompletableFuture<Void> future = new CompletableFuture<>(); 3668 addListener(tableExists(tableName), (exist, err) -> { 3669 if (err != null) { 3670 future.completeExceptionally(err); 3671 return; 3672 } 3673 if (!exist) { 3674 future.completeExceptionally(new TableNotFoundException(tableName)); 3675 return; 3676 } 3677 addListener(tableExists(newTableName), (exist1, err1) -> { 3678 if (err1 != null) { 3679 future.completeExceptionally(err1); 3680 return; 3681 } 3682 if (exist1) { 3683 future.completeExceptionally(new TableExistsException(newTableName)); 3684 return; 3685 } 3686 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3687 if (err2 != null) { 3688 future.completeExceptionally(err2); 3689 return; 3690 } 3691 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3692 if (preserveSplits) { 3693 addListener(getTableSplits(tableName), (splits, err3) -> { 3694 if (err3 != null) { 3695 future.completeExceptionally(err3); 3696 } else { 3697 addListener( 3698 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3699 (result, err4) -> { 3700 if (err4 != null) { 3701 future.completeExceptionally(err4); 3702 } else { 3703 future.complete(result); 3704 } 3705 }); 3706 } 3707 }); 3708 } else { 3709 addListener(createTable(newTableDesc), (result, err5) -> { 3710 if (err5 != null) { 3711 future.completeExceptionally(err5); 3712 } else { 3713 future.complete(result); 3714 } 3715 }); 3716 } 3717 }); 3718 }); 3719 }); 3720 return future; 3721 } 3722 3723 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3724 List<RegionInfo> hris) { 3725 return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this 3726 .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall( 3727 controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris), 3728 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3729 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3730 .serverName(serverName).call(); 3731 } 3732 3733 @Override 3734 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 3735 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3736 .action((controller, stub) -> this 3737 .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub, 3738 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 3739 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 3740 resp -> resp.getPreviousRpcThrottleEnabled())) 3741 .call(); 3742 return future; 3743 } 3744 3745 @Override 3746 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 3747 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3748 .action((controller, stub) -> this 3749 .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller, 3750 stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 3751 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 3752 resp -> resp.getRpcThrottleEnabled())) 3753 .call(); 3754 return future; 3755 } 3756 3757 @Override 3758 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 3759 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3760 .action((controller, stub) -> this 3761 .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call( 3762 controller, stub, 3763 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 3764 .build(), 3765 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 3766 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 3767 .call(); 3768 return future; 3769 } 3770 3771 @Override 3772 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 3773 return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this 3774 .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse, 3775 Map<TableName, Long>> call(controller, stub, 3776 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 3777 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 3778 resp -> resp.getSizesList().stream().collect(Collectors 3779 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 3780 .call(); 3781 } 3782 3783 @Override 3784 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots( 3785 ServerName serverName) { 3786 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 3787 .action((controller, stub) -> this 3788 .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse, 3789 Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub, 3790 RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 3791 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 3792 resp -> resp.getSnapshotsList().stream() 3793 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 3794 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 3795 .serverName(serverName).call(); 3796 } 3797 3798 private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot( 3799 Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 3800 return this.<SpaceQuotaSnapshot> newMasterCaller() 3801 .action((controller, stub) -> this 3802 .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub, 3803 RequestConverter.buildGetQuotaStatesRequest(), 3804 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 3805 .call(); 3806 } 3807 3808 @Override 3809 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 3810 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 3811 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 3812 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3813 } 3814 3815 @Override 3816 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 3817 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 3818 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 3819 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 3820 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3821 } 3822 3823 @Override 3824 public CompletableFuture<Void> grant(UserPermission userPermission, 3825 boolean mergeExistingPermissions) { 3826 return this.<Void> newMasterCaller() 3827 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, 3828 stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 3829 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 3830 .call(); 3831 } 3832 3833 @Override 3834 public CompletableFuture<Void> revoke(UserPermission userPermission) { 3835 return this.<Void> newMasterCaller() 3836 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 3837 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 3838 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 3839 .call(); 3840 } 3841 3842 @Override 3843 public CompletableFuture<List<UserPermission>> 3844 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 3845 return this.<List<UserPermission>> newMasterCaller().action((controller, 3846 stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse, 3847 List<UserPermission>> call(controller, stub, 3848 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 3849 (s, c, req, done) -> s.getUserPermissions(c, req, done), 3850 resp -> resp.getUserPermissionList().stream() 3851 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 3852 .collect(Collectors.toList()))) 3853 .call(); 3854 } 3855 3856 @Override 3857 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 3858 List<Permission> permissions) { 3859 return this.<List<Boolean>> newMasterCaller() 3860 .action((controller, stub) -> this 3861 .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller, 3862 stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 3863 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 3864 resp -> resp.getHasUserPermissionList())) 3865 .call(); 3866 } 3867 3868 @Override 3869 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, 3870 final boolean sync) { 3871 return this.<Boolean>newMasterCaller() 3872 .action((controller, stub) -> this 3873 .call(controller, stub, 3874 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 3875 MasterService.Interface::switchSnapshotCleanup, 3876 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 3877 .call(); 3878 } 3879 3880 @Override 3881 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 3882 return this.<Boolean>newMasterCaller() 3883 .action((controller, stub) -> this 3884 .call(controller, stub, 3885 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 3886 MasterService.Interface::isSnapshotCleanupEnabled, 3887 IsSnapshotCleanupEnabledResponse::getEnabled)) 3888 .call(); 3889 } 3890 3891 @Override 3892 public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses( 3893 @Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) { 3894 if (CollectionUtils.isEmpty(serverNames)) { 3895 return CompletableFuture.completedFuture(Collections.emptyList()); 3896 } 3897 if (logQueryFilter.getType() == null 3898 || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) { 3899 return CompletableFuture.supplyAsync(() -> serverNames.stream() 3900 .map((ServerName serverName) -> 3901 getSlowLogResponseFromServer(serverName, logQueryFilter)) 3902 .map(CompletableFuture::join) 3903 .flatMap(List::stream) 3904 .collect(Collectors.toList())); 3905 } else { 3906 return CompletableFuture.supplyAsync(() -> serverNames.stream() 3907 .map((ServerName serverName) -> 3908 getLargeLogResponseFromServer(serverName, logQueryFilter)) 3909 .map(CompletableFuture::join) 3910 .flatMap(List::stream) 3911 .collect(Collectors.toList())); 3912 } 3913 } 3914 3915 private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer( 3916 final ServerName serverName, final LogQueryFilter logQueryFilter) { 3917 return this.<List<OnlineLogRecord>>newAdminCaller() 3918 .action((controller, stub) -> this 3919 .adminCall( 3920 controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter), 3921 AdminService.Interface::getLargeLogResponses, 3922 ProtobufUtil::toSlowLogPayloads)) 3923 .serverName(serverName).call(); 3924 } 3925 3926 private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer( 3927 final ServerName serverName, final LogQueryFilter logQueryFilter) { 3928 return this.<List<OnlineLogRecord>>newAdminCaller() 3929 .action((controller, stub) -> this 3930 .adminCall( 3931 controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter), 3932 AdminService.Interface::getSlowLogResponses, 3933 ProtobufUtil::toSlowLogPayloads)) 3934 .serverName(serverName).call(); 3935 } 3936 3937 @Override 3938 public CompletableFuture<List<Boolean>> clearSlowLogResponses( 3939 @Nullable Set<ServerName> serverNames) { 3940 if (CollectionUtils.isEmpty(serverNames)) { 3941 return CompletableFuture.completedFuture(Collections.emptyList()); 3942 } 3943 List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream() 3944 .map(this::clearSlowLogsResponses) 3945 .collect(Collectors.toList()); 3946 return convertToFutureOfList(clearSlowLogResponseList); 3947 } 3948 3949 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 3950 return this.<Boolean>newAdminCaller() 3951 .action(((controller, stub) -> this 3952 .adminCall( 3953 controller, stub, RequestConverter.buildClearSlowLogResponseRequest(), 3954 AdminService.Interface::clearSlowLogsResponses, 3955 ProtobufUtil::toClearSlowLogPayload)) 3956 ).serverName(serverName).call(); 3957 } 3958 3959 private static <T> CompletableFuture<List<T>> convertToFutureOfList( 3960 List<CompletableFuture<T>> futures) { 3961 CompletableFuture<Void> allDoneFuture = 3962 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 3963 return allDoneFuture.thenApply(v -> 3964 futures.stream() 3965 .map(CompletableFuture::join) 3966 .collect(Collectors.toList()) 3967 ); 3968 } 3969 3970}