001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.function.BiConsumer; 042import java.util.function.Consumer; 043import java.util.function.Function; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.CacheEvictionStats; 050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 051import org.apache.hadoop.hbase.CatalogFamilyFormat; 052import org.apache.hadoop.hbase.ClientMetaTableAccessor; 053import org.apache.hadoop.hbase.ClusterMetrics; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.ClusterMetricsBuilder; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.NamespaceDescriptor; 059import org.apache.hadoop.hbase.RegionLocations; 060import org.apache.hadoop.hbase.RegionMetrics; 061import org.apache.hadoop.hbase.RegionMetricsBuilder; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableExistsException; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.TableNotDisabledException; 066import org.apache.hadoop.hbase.TableNotEnabledException; 067import org.apache.hadoop.hbase.TableNotFoundException; 068import org.apache.hadoop.hbase.UnknownRegionException; 069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.Scan.ReadType; 073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 074import org.apache.hadoop.hbase.client.replication.TableCFs; 075import org.apache.hadoop.hbase.client.security.SecurityCapability; 076import org.apache.hadoop.hbase.exceptions.DeserializationException; 077import org.apache.hadoop.hbase.ipc.HBaseRpcController; 078import org.apache.hadoop.hbase.net.Address; 079import org.apache.hadoop.hbase.quotas.QuotaFilter; 080import org.apache.hadoop.hbase.quotas.QuotaSettings; 081import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 083import org.apache.hadoop.hbase.replication.ReplicationException; 084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 086import org.apache.hadoop.hbase.replication.SyncReplicationState; 087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 089import org.apache.hadoop.hbase.security.access.Permission; 090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 091import org.apache.hadoop.hbase.security.access.UserPermission; 092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.yetus.audience.InterfaceAudience; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 104import org.apache.hbase.thirdparty.com.google.protobuf.Message; 105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 108import org.apache.hbase.thirdparty.io.netty.util.Timeout; 109import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 111 112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 337 338/** 339 * The implementation of AsyncAdmin. 340 * <p> 341 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 342 * be finished inside the rpc framework thread, which means that the callbacks registered to the 343 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 344 * this class should not try to do time consuming tasks in the callbacks. 345 * @since 2.0.0 346 * @see AsyncHBaseAdmin 347 * @see AsyncConnection#getAdmin() 348 * @see AsyncConnection#getAdminBuilder() 349 */ 350@InterfaceAudience.Private 351class RawAsyncHBaseAdmin implements AsyncAdmin { 352 353 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 354 355 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 356 357 private final AsyncConnectionImpl connection; 358 359 private final HashedWheelTimer retryTimer; 360 361 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 362 363 private final long rpcTimeoutNs; 364 365 private final long operationTimeoutNs; 366 367 private final long pauseNs; 368 369 private final long pauseNsForServerOverloaded; 370 371 private final int maxAttempts; 372 373 private final int startLogErrorsCnt; 374 375 private final NonceGenerator ng; 376 377 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 378 AsyncAdminBuilderBase builder) { 379 this.connection = connection; 380 this.retryTimer = retryTimer; 381 this.metaTable = connection.getTable(META_TABLE_NAME); 382 this.rpcTimeoutNs = builder.rpcTimeoutNs; 383 this.operationTimeoutNs = builder.operationTimeoutNs; 384 this.pauseNs = builder.pauseNs; 385 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 386 LOG.warn( 387 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 388 + " the normal pause value {} ms, use the greater one instead", 389 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 390 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 391 this.pauseNsForServerOverloaded = builder.pauseNs; 392 } else { 393 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 394 } 395 this.maxAttempts = builder.maxAttempts; 396 this.startLogErrorsCnt = builder.startLogErrorsCnt; 397 this.ng = connection.getNonceGenerator(); 398 } 399 400 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 401 return this.connection.callerFactory.<T> masterRequest() 402 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 403 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 404 .pause(pauseNs, TimeUnit.NANOSECONDS) 405 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 406 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 407 } 408 409 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 410 return this.connection.callerFactory.<T> adminRequest() 411 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 412 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 413 .pause(pauseNs, TimeUnit.NANOSECONDS) 414 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 415 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 416 } 417 418 @FunctionalInterface 419 private interface MasterRpcCall<RESP, REQ> { 420 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 421 RpcCallback<RESP> done); 422 } 423 424 @FunctionalInterface 425 private interface AdminRpcCall<RESP, REQ> { 426 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 427 RpcCallback<RESP> done); 428 } 429 430 @FunctionalInterface 431 private interface Converter<D, S> { 432 D convert(S src) throws IOException; 433 } 434 435 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 436 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 437 Converter<RESP, PRESP> respConverter) { 438 CompletableFuture<RESP> future = new CompletableFuture<>(); 439 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 440 441 @Override 442 public void run(PRESP resp) { 443 if (controller.failed()) { 444 future.completeExceptionally(controller.getFailed()); 445 } else { 446 try { 447 future.complete(respConverter.convert(resp)); 448 } catch (IOException e) { 449 future.completeExceptionally(e); 450 } 451 } 452 } 453 }); 454 return future; 455 } 456 457 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 458 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 459 Converter<RESP, PRESP> respConverter) { 460 CompletableFuture<RESP> future = new CompletableFuture<>(); 461 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 462 463 @Override 464 public void run(PRESP resp) { 465 if (controller.failed()) { 466 future.completeExceptionally(controller.getFailed()); 467 } else { 468 try { 469 future.complete(respConverter.convert(resp)); 470 } catch (IOException e) { 471 future.completeExceptionally(e); 472 } 473 } 474 } 475 }); 476 return future; 477 } 478 479 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 480 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 481 ProcedureBiConsumer consumer) { 482 return procedureCall(b -> { 483 }, preq, rpcCall, respConverter, consumer); 484 } 485 486 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 487 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 488 ProcedureBiConsumer consumer) { 489 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 490 } 491 492 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 493 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 494 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 495 ProcedureBiConsumer consumer) { 496 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 497 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 498 prioritySetter.accept(builder); 499 CompletableFuture<Long> procFuture = builder.call(); 500 CompletableFuture<Void> future = waitProcedureResult(procFuture); 501 addListener(future, consumer); 502 return future; 503 } 504 505 @FunctionalInterface 506 private interface TableOperator { 507 CompletableFuture<Void> operate(TableName table); 508 } 509 510 @Override 511 public CompletableFuture<Boolean> tableExists(TableName tableName) { 512 if (TableName.isMetaTableName(tableName)) { 513 return CompletableFuture.completedFuture(true); 514 } 515 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 516 } 517 518 @Override 519 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 520 return getTableDescriptors( 521 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 522 } 523 524 /** 525 * {@link #listTableDescriptors(boolean)} 526 */ 527 @Override 528 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 529 boolean includeSysTables) { 530 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 531 + "use listTableDescriptors(boolean) instead"); 532 return getTableDescriptors( 533 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 534 } 535 536 @Override 537 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 538 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 539 + "use listTableDescriptors(boolean) instead"); 540 if (tableNames.isEmpty()) { 541 return CompletableFuture.completedFuture(Collections.emptyList()); 542 } 543 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 544 } 545 546 private CompletableFuture<List<TableDescriptor>> 547 getTableDescriptors(GetTableDescriptorsRequest request) { 548 return this.<List<TableDescriptor>> newMasterCaller() 549 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 550 List<TableDescriptor>> call(controller, stub, request, 551 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 552 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 553 .call(); 554 } 555 556 @Override 557 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 558 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 559 } 560 561 @Override 562 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 563 boolean includeSysTables) { 564 Preconditions.checkNotNull(pattern, 565 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 566 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 567 } 568 569 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 570 return this.<List<TableName>> newMasterCaller() 571 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 572 List<TableName>> call(controller, stub, request, 573 (s, c, req, done) -> s.getTableNames(c, req, done), 574 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 575 .call(); 576 } 577 578 @Override 579 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 580 return this.<List<TableDescriptor>> newMasterCaller() 581 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 582 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 583 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 584 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 585 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 586 .call(); 587 } 588 589 @Override 590 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 591 return this.<List<TableName>> newMasterCaller() 592 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 593 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 594 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 595 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 596 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 597 .call(); 598 } 599 600 @Override 601 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 602 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 603 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 604 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 605 List<TableSchema>> call(controller, stub, 606 RequestConverter.buildGetTableDescriptorsRequest(tableName), 607 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 608 (resp) -> resp.getTableSchemaList())) 609 .call(), (tableSchemas, error) -> { 610 if (error != null) { 611 future.completeExceptionally(error); 612 return; 613 } 614 if (!tableSchemas.isEmpty()) { 615 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 616 } else { 617 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 618 } 619 }); 620 return future; 621 } 622 623 @Override 624 public CompletableFuture<Void> createTable(TableDescriptor desc) { 625 return createTable(desc.getTableName(), 626 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 627 } 628 629 @Override 630 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 631 int numRegions) { 632 try { 633 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 634 } catch (IllegalArgumentException e) { 635 return failedFuture(e); 636 } 637 } 638 639 @Override 640 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 641 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 642 + " use createTable(TableDescriptor) instead"); 643 try { 644 verifySplitKeys(splitKeys); 645 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 646 splitKeys, ng.getNonceGroup(), ng.newNonce())); 647 } catch (IllegalArgumentException e) { 648 return failedFuture(e); 649 } 650 } 651 652 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 653 Preconditions.checkNotNull(tableName, "table name is null"); 654 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 655 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 656 new CreateTableProcedureBiConsumer(tableName)); 657 } 658 659 @Override 660 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 661 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 662 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 663 ng.newNonce()), 664 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 665 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 666 } 667 668 @Override 669 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 670 return this.<ModifyTableStoreFileTrackerRequest, 671 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 672 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 673 ng.getNonceGroup(), ng.newNonce()), 674 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 675 (resp) -> resp.getProcId(), 676 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 677 } 678 679 @Override 680 public CompletableFuture<Void> deleteTable(TableName tableName) { 681 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 682 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 683 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 684 new DeleteTableProcedureBiConsumer(tableName)); 685 } 686 687 @Override 688 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 689 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 690 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 691 ng.newNonce()), 692 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 693 new TruncateTableProcedureBiConsumer(tableName)); 694 } 695 696 @Override 697 public CompletableFuture<Void> enableTable(TableName tableName) { 698 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 699 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 700 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 701 new EnableTableProcedureBiConsumer(tableName)); 702 } 703 704 @Override 705 public CompletableFuture<Void> disableTable(TableName tableName) { 706 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 707 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 708 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 709 new DisableTableProcedureBiConsumer(tableName)); 710 } 711 712 /** 713 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 714 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 715 * targetState). 716 */ 717 private static CompletableFuture<Boolean> completeCheckTableState( 718 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 719 TableState.State targetState, TableName tableName) { 720 if (error != null) { 721 future.completeExceptionally(error); 722 } else { 723 if (tableState != null) { 724 future.complete(tableState.inStates(targetState)); 725 } else { 726 future.completeExceptionally(new TableNotFoundException(tableName)); 727 } 728 } 729 return future; 730 } 731 732 @Override 733 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 734 if (TableName.isMetaTableName(tableName)) { 735 return CompletableFuture.completedFuture(true); 736 } 737 CompletableFuture<Boolean> future = new CompletableFuture<>(); 738 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 739 (tableState, error) -> { 740 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 741 TableState.State.ENABLED, tableName); 742 }); 743 return future; 744 } 745 746 @Override 747 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 748 if (TableName.isMetaTableName(tableName)) { 749 return CompletableFuture.completedFuture(false); 750 } 751 CompletableFuture<Boolean> future = new CompletableFuture<>(); 752 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 753 (tableState, error) -> { 754 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 755 TableState.State.DISABLED, tableName); 756 }); 757 return future; 758 } 759 760 @Override 761 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 762 if (TableName.isMetaTableName(tableName)) { 763 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 764 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 765 } 766 CompletableFuture<Boolean> future = new CompletableFuture<>(); 767 addListener(isTableEnabled(tableName), (enabled, error) -> { 768 if (error != null) { 769 if (error instanceof TableNotFoundException) { 770 future.complete(false); 771 } else { 772 future.completeExceptionally(error); 773 } 774 return; 775 } 776 if (!enabled) { 777 future.complete(false); 778 } else { 779 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 780 (locations, error1) -> { 781 if (error1 != null) { 782 future.completeExceptionally(error1); 783 return; 784 } 785 List<HRegionLocation> notDeployedRegions = locations.stream() 786 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 787 if (notDeployedRegions.size() > 0) { 788 if (LOG.isDebugEnabled()) { 789 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 790 } 791 future.complete(false); 792 return; 793 } 794 future.complete(true); 795 }); 796 } 797 }); 798 return future; 799 } 800 801 @Override 802 public CompletableFuture<Void> addColumnFamily(TableName tableName, 803 ColumnFamilyDescriptor columnFamily) { 804 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 805 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 806 ng.newNonce()), 807 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 808 new AddColumnFamilyProcedureBiConsumer(tableName)); 809 } 810 811 @Override 812 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 813 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 814 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 815 ng.newNonce()), 816 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 817 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 818 } 819 820 @Override 821 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 822 ColumnFamilyDescriptor columnFamily) { 823 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 824 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 825 ng.newNonce()), 826 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 827 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 828 } 829 830 @Override 831 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 832 byte[] family, String dstSFT) { 833 return this.<ModifyColumnStoreFileTrackerRequest, 834 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 835 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 836 ng.getNonceGroup(), ng.newNonce()), 837 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 838 (resp) -> resp.getProcId(), 839 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 840 } 841 842 @Override 843 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 844 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 845 RequestConverter.buildCreateNamespaceRequest(descriptor), 846 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 847 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 848 } 849 850 @Override 851 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 852 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 853 RequestConverter.buildModifyNamespaceRequest(descriptor), 854 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 855 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 856 } 857 858 @Override 859 public CompletableFuture<Void> deleteNamespace(String name) { 860 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 861 RequestConverter.buildDeleteNamespaceRequest(name), 862 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 863 new DeleteNamespaceProcedureBiConsumer(name)); 864 } 865 866 @Override 867 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 868 return this.<NamespaceDescriptor> newMasterCaller() 869 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 870 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 871 RequestConverter.buildGetNamespaceDescriptorRequest(name), 872 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 873 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 874 .call(); 875 } 876 877 @Override 878 public CompletableFuture<List<String>> listNamespaces() { 879 return this.<List<String>> newMasterCaller() 880 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 881 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 882 (s, c, req, done) -> s.listNamespaces(c, req, done), 883 (resp) -> resp.getNamespaceNameList())) 884 .call(); 885 } 886 887 @Override 888 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 889 return this.<List<NamespaceDescriptor>> newMasterCaller() 890 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 891 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 892 ListNamespaceDescriptorsRequest.newBuilder().build(), 893 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 894 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 895 .call(); 896 } 897 898 @Override 899 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 900 return this.<List<RegionInfo>> newAdminCaller() 901 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 902 List<RegionInfo>> adminCall(controller, stub, 903 RequestConverter.buildGetOnlineRegionRequest(), 904 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 905 resp -> ProtobufUtil.getRegionInfos(resp))) 906 .serverName(serverName).call(); 907 } 908 909 @Override 910 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 911 if (tableName.equals(META_TABLE_NAME)) { 912 return connection.registry.getMetaRegionLocations() 913 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 914 .collect(Collectors.toList())); 915 } else { 916 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 917 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 918 } 919 } 920 921 @Override 922 public CompletableFuture<Void> flush(TableName tableName) { 923 return flush(tableName, null); 924 } 925 926 @Override 927 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 928 CompletableFuture<Void> future = new CompletableFuture<>(); 929 addListener(tableExists(tableName), (exists, err) -> { 930 if (err != null) { 931 future.completeExceptionally(err); 932 } else if (!exists) { 933 future.completeExceptionally(new TableNotFoundException(tableName)); 934 } else { 935 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 936 if (err2 != null) { 937 future.completeExceptionally(err2); 938 } else if (!tableEnabled) { 939 future.completeExceptionally(new TableNotEnabledException(tableName)); 940 } else { 941 Map<String, String> props = new HashMap<>(); 942 if (columnFamily != null) { 943 props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); 944 } 945 addListener( 946 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 947 (ret, err3) -> { 948 if (err3 != null) { 949 future.completeExceptionally(err3); 950 } else { 951 future.complete(ret); 952 } 953 }); 954 } 955 }); 956 } 957 }); 958 return future; 959 } 960 961 @Override 962 public CompletableFuture<Void> flushRegion(byte[] regionName) { 963 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 964 }); 965 } 966 967 @Override 968 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 969 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 970 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 971 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 972 }); 973 } 974 975 /** 976 * This method is for internal use only, where we need the response of the flush. 977 * <p/> 978 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 979 * API. 980 */ 981 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 982 boolean writeFlushWALMarker) { 983 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 984 addListener(getRegionLocation(regionName), (location, err) -> { 985 if (err != null) { 986 future.completeExceptionally(err); 987 return; 988 } 989 ServerName serverName = location.getServerName(); 990 if (serverName == null) { 991 future 992 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 993 return; 994 } 995 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 996 (ret, err2) -> { 997 if (err2 != null) { 998 future.completeExceptionally(err2); 999 } else { 1000 future.complete(ret); 1001 } 1002 }); 1003 }); 1004 return future; 1005 } 1006 1007 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1008 byte[] columnFamily, boolean writeFlushWALMarker) { 1009 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1010 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1011 FlushRegionResponse> adminCall(controller, stub, 1012 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1013 writeFlushWALMarker), 1014 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1015 .call(); 1016 } 1017 1018 @Override 1019 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1020 CompletableFuture<Void> future = new CompletableFuture<>(); 1021 addListener(getRegions(sn), (hRegionInfos, err) -> { 1022 if (err != null) { 1023 future.completeExceptionally(err); 1024 return; 1025 } 1026 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1027 if (hRegionInfos != null) { 1028 hRegionInfos 1029 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1030 }))); 1031 } 1032 addListener(CompletableFuture.allOf( 1033 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1034 if (err2 != null) { 1035 future.completeExceptionally(err2); 1036 } else { 1037 future.complete(ret); 1038 } 1039 }); 1040 }); 1041 return future; 1042 } 1043 1044 @Override 1045 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1046 return compact(tableName, null, false, compactType); 1047 } 1048 1049 @Override 1050 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1051 CompactType compactType) { 1052 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1053 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1054 return compact(tableName, columnFamily, false, compactType); 1055 } 1056 1057 @Override 1058 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1059 return compactRegion(regionName, null, false); 1060 } 1061 1062 @Override 1063 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1064 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1065 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1066 return compactRegion(regionName, columnFamily, false); 1067 } 1068 1069 @Override 1070 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1071 return compact(tableName, null, true, compactType); 1072 } 1073 1074 @Override 1075 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1076 CompactType compactType) { 1077 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1078 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1079 return compact(tableName, columnFamily, true, compactType); 1080 } 1081 1082 @Override 1083 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1084 return compactRegion(regionName, null, true); 1085 } 1086 1087 @Override 1088 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1089 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1090 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1091 return compactRegion(regionName, columnFamily, true); 1092 } 1093 1094 @Override 1095 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1096 return compactRegionServer(sn, false); 1097 } 1098 1099 @Override 1100 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1101 return compactRegionServer(sn, true); 1102 } 1103 1104 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1105 CompletableFuture<Void> future = new CompletableFuture<>(); 1106 addListener(getRegions(sn), (hRegionInfos, err) -> { 1107 if (err != null) { 1108 future.completeExceptionally(err); 1109 return; 1110 } 1111 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1112 if (hRegionInfos != null) { 1113 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1114 } 1115 addListener(CompletableFuture.allOf( 1116 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1117 if (err2 != null) { 1118 future.completeExceptionally(err2); 1119 } else { 1120 future.complete(ret); 1121 } 1122 }); 1123 }); 1124 return future; 1125 } 1126 1127 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1128 boolean major) { 1129 CompletableFuture<Void> future = new CompletableFuture<>(); 1130 addListener(getRegionLocation(regionName), (location, err) -> { 1131 if (err != null) { 1132 future.completeExceptionally(err); 1133 return; 1134 } 1135 ServerName serverName = location.getServerName(); 1136 if (serverName == null) { 1137 future 1138 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1139 return; 1140 } 1141 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1142 (ret, err2) -> { 1143 if (err2 != null) { 1144 future.completeExceptionally(err2); 1145 } else { 1146 future.complete(ret); 1147 } 1148 }); 1149 }); 1150 return future; 1151 } 1152 1153 /** 1154 * List all region locations for the specific table. 1155 */ 1156 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1157 if (TableName.META_TABLE_NAME.equals(tableName)) { 1158 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1159 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1160 if (err != null) { 1161 future.completeExceptionally(err); 1162 } else if ( 1163 metaRegions == null || metaRegions.isEmpty() 1164 || metaRegions.getDefaultRegionLocation() == null 1165 ) { 1166 future.completeExceptionally(new IOException("meta region does not found")); 1167 } else { 1168 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1169 } 1170 }); 1171 return future; 1172 } else { 1173 // For non-meta table, we fetch all locations by scanning hbase:meta table 1174 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1175 } 1176 } 1177 1178 /** 1179 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1180 */ 1181 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1182 CompactType compactType) { 1183 CompletableFuture<Void> future = new CompletableFuture<>(); 1184 1185 switch (compactType) { 1186 case MOB: 1187 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1188 if (err != null) { 1189 future.completeExceptionally(err); 1190 return; 1191 } 1192 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1193 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1194 if (err2 != null) { 1195 future.completeExceptionally(err2); 1196 } else { 1197 future.complete(ret); 1198 } 1199 }); 1200 }); 1201 break; 1202 case NORMAL: 1203 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1204 if (err != null) { 1205 future.completeExceptionally(err); 1206 return; 1207 } 1208 if (locations == null || locations.isEmpty()) { 1209 future.completeExceptionally(new TableNotFoundException(tableName)); 1210 } 1211 CompletableFuture<?>[] compactFutures = 1212 locations.stream().filter(l -> l.getRegion() != null) 1213 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1214 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1215 .toArray(CompletableFuture<?>[]::new); 1216 // future complete unless all of the compact futures are completed. 1217 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1218 if (err2 != null) { 1219 future.completeExceptionally(err2); 1220 } else { 1221 future.complete(ret); 1222 } 1223 }); 1224 }); 1225 break; 1226 default: 1227 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1228 } 1229 return future; 1230 } 1231 1232 /** 1233 * Compact the region at specific region server. 1234 */ 1235 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1236 final boolean major, byte[] columnFamily) { 1237 return this.<Void> newAdminCaller().serverName(sn) 1238 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1239 Void> adminCall(controller, stub, 1240 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1241 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1242 .call(); 1243 } 1244 1245 private byte[] toEncodeRegionName(byte[] regionName) { 1246 return RegionInfo.isEncodedRegionName(regionName) 1247 ? regionName 1248 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1249 } 1250 1251 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1252 CompletableFuture<TableName> result) { 1253 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1254 if (err != null) { 1255 result.completeExceptionally(err); 1256 return; 1257 } 1258 RegionInfo regionInfo = location.getRegion(); 1259 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1260 result.completeExceptionally( 1261 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1262 return; 1263 } 1264 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1265 if (!tableName.get().equals(regionInfo.getTable())) { 1266 // tables of this two region should be same. 1267 result.completeExceptionally( 1268 new IllegalArgumentException("Cannot merge regions from two different tables " 1269 + tableName.get() + " and " + regionInfo.getTable())); 1270 } else { 1271 result.complete(tableName.get()); 1272 } 1273 } 1274 }); 1275 } 1276 1277 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1278 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1279 CompletableFuture<TableName> future = new CompletableFuture<>(); 1280 for (byte[] encodedRegionName : encodedRegionNames) { 1281 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1282 } 1283 return future; 1284 } 1285 1286 @Override 1287 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1288 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1289 } 1290 1291 @Override 1292 public CompletableFuture<Boolean> isMergeEnabled() { 1293 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1294 } 1295 1296 @Override 1297 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1298 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1299 } 1300 1301 @Override 1302 public CompletableFuture<Boolean> isSplitEnabled() { 1303 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1304 } 1305 1306 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1307 MasterSwitchType switchType) { 1308 SetSplitOrMergeEnabledRequest request = 1309 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1310 return this.<Boolean> newMasterCaller() 1311 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1312 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1313 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1314 (resp) -> resp.getPrevValueList().get(0))) 1315 .call(); 1316 } 1317 1318 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1319 IsSplitOrMergeEnabledRequest request = 1320 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1321 return this.<Boolean> newMasterCaller() 1322 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1323 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1324 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1325 .call(); 1326 } 1327 1328 @Override 1329 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1330 if (nameOfRegionsToMerge.size() < 2) { 1331 return failedFuture(new IllegalArgumentException( 1332 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1333 } 1334 CompletableFuture<Void> future = new CompletableFuture<>(); 1335 byte[][] encodedNameOfRegionsToMerge = 1336 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1337 1338 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1339 if (err != null) { 1340 future.completeExceptionally(err); 1341 return; 1342 } 1343 1344 final MergeTableRegionsRequest request; 1345 try { 1346 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1347 forcible, ng.getNonceGroup(), ng.newNonce()); 1348 } catch (DeserializationException e) { 1349 future.completeExceptionally(e); 1350 return; 1351 } 1352 1353 addListener( 1354 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1355 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1356 (ret, err2) -> { 1357 if (err2 != null) { 1358 future.completeExceptionally(err2); 1359 } else { 1360 future.complete(ret); 1361 } 1362 }); 1363 }); 1364 return future; 1365 } 1366 1367 @Override 1368 public CompletableFuture<Void> split(TableName tableName) { 1369 CompletableFuture<Void> future = new CompletableFuture<>(); 1370 addListener(tableExists(tableName), (exist, error) -> { 1371 if (error != null) { 1372 future.completeExceptionally(error); 1373 return; 1374 } 1375 if (!exist) { 1376 future.completeExceptionally(new TableNotFoundException(tableName)); 1377 return; 1378 } 1379 addListener(metaTable 1380 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1381 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1382 ClientMetaTableAccessor.QueryType.REGION)) 1383 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1384 ClientMetaTableAccessor.QueryType.REGION))), 1385 (results, err2) -> { 1386 if (err2 != null) { 1387 future.completeExceptionally(err2); 1388 return; 1389 } 1390 if (results != null && !results.isEmpty()) { 1391 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1392 for (Result r : results) { 1393 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1394 continue; 1395 } 1396 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1397 if (rl != null) { 1398 for (HRegionLocation h : rl.getRegionLocations()) { 1399 if (h != null && h.getServerName() != null) { 1400 RegionInfo hri = h.getRegion(); 1401 if ( 1402 hri == null || hri.isSplitParent() 1403 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1404 ) { 1405 continue; 1406 } 1407 splitFutures.add(split(hri, null)); 1408 } 1409 } 1410 } 1411 } 1412 addListener( 1413 CompletableFuture 1414 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1415 (ret, exception) -> { 1416 if (exception != null) { 1417 future.completeExceptionally(exception); 1418 return; 1419 } 1420 future.complete(ret); 1421 }); 1422 } else { 1423 future.complete(null); 1424 } 1425 }); 1426 }); 1427 return future; 1428 } 1429 1430 @Override 1431 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1432 CompletableFuture<Void> result = new CompletableFuture<>(); 1433 if (splitPoint == null) { 1434 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1435 } 1436 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1437 (loc, err) -> { 1438 if (err != null) { 1439 result.completeExceptionally(err); 1440 } else if (loc == null || loc.getRegion() == null) { 1441 result.completeExceptionally(new IllegalArgumentException( 1442 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1443 } else { 1444 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1445 if (err2 != null) { 1446 result.completeExceptionally(err2); 1447 } else { 1448 result.complete(ret); 1449 } 1450 1451 }); 1452 } 1453 }); 1454 return result; 1455 } 1456 1457 @Override 1458 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1459 CompletableFuture<Void> future = new CompletableFuture<>(); 1460 addListener(getRegionLocation(regionName), (location, err) -> { 1461 if (err != null) { 1462 future.completeExceptionally(err); 1463 return; 1464 } 1465 RegionInfo regionInfo = location.getRegion(); 1466 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1467 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1468 + "Replicas are auto-split when their primary is split.")); 1469 return; 1470 } 1471 ServerName serverName = location.getServerName(); 1472 if (serverName == null) { 1473 future 1474 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1475 return; 1476 } 1477 addListener(split(regionInfo, null), (ret, err2) -> { 1478 if (err2 != null) { 1479 future.completeExceptionally(err2); 1480 } else { 1481 future.complete(ret); 1482 } 1483 }); 1484 }); 1485 return future; 1486 } 1487 1488 @Override 1489 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1490 Preconditions.checkNotNull(splitPoint, 1491 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1492 CompletableFuture<Void> future = new CompletableFuture<>(); 1493 addListener(getRegionLocation(regionName), (location, err) -> { 1494 if (err != null) { 1495 future.completeExceptionally(err); 1496 return; 1497 } 1498 RegionInfo regionInfo = location.getRegion(); 1499 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1500 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1501 + "Replicas are auto-split when their primary is split.")); 1502 return; 1503 } 1504 ServerName serverName = location.getServerName(); 1505 if (serverName == null) { 1506 future 1507 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1508 return; 1509 } 1510 if ( 1511 regionInfo.getStartKey() != null 1512 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1513 ) { 1514 future.completeExceptionally( 1515 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1516 return; 1517 } 1518 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1519 if (err2 != null) { 1520 future.completeExceptionally(err2); 1521 } else { 1522 future.complete(ret); 1523 } 1524 }); 1525 }); 1526 return future; 1527 } 1528 1529 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1530 CompletableFuture<Void> future = new CompletableFuture<>(); 1531 TableName tableName = hri.getTable(); 1532 final SplitTableRegionRequest request; 1533 try { 1534 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1535 ng.newNonce()); 1536 } catch (DeserializationException e) { 1537 future.completeExceptionally(e); 1538 return future; 1539 } 1540 1541 addListener( 1542 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1543 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1544 (ret, err2) -> { 1545 if (err2 != null) { 1546 future.completeExceptionally(err2); 1547 } else { 1548 future.complete(ret); 1549 } 1550 }); 1551 return future; 1552 } 1553 1554 @Override 1555 public CompletableFuture<Void> assign(byte[] regionName) { 1556 CompletableFuture<Void> future = new CompletableFuture<>(); 1557 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1558 if (err != null) { 1559 future.completeExceptionally(err); 1560 return; 1561 } 1562 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1563 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1564 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1565 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) 1566 .call(), (ret, err2) -> { 1567 if (err2 != null) { 1568 future.completeExceptionally(err2); 1569 } else { 1570 future.complete(ret); 1571 } 1572 }); 1573 }); 1574 return future; 1575 } 1576 1577 @Override 1578 public CompletableFuture<Void> unassign(byte[] regionName) { 1579 CompletableFuture<Void> future = new CompletableFuture<>(); 1580 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1581 if (err != null) { 1582 future.completeExceptionally(err); 1583 return; 1584 } 1585 addListener( 1586 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1587 .action(((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1588 Void> call(controller, stub, 1589 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1590 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) 1591 .call(), 1592 (ret, err2) -> { 1593 if (err2 != null) { 1594 future.completeExceptionally(err2); 1595 } else { 1596 future.complete(ret); 1597 } 1598 }); 1599 }); 1600 return future; 1601 } 1602 1603 @Override 1604 public CompletableFuture<Void> offline(byte[] regionName) { 1605 CompletableFuture<Void> future = new CompletableFuture<>(); 1606 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1607 if (err != null) { 1608 future.completeExceptionally(err); 1609 return; 1610 } 1611 addListener( 1612 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1613 .action(((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, 1614 Void> call(controller, stub, 1615 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1616 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) 1617 .call(), 1618 (ret, err2) -> { 1619 if (err2 != null) { 1620 future.completeExceptionally(err2); 1621 } else { 1622 future.complete(ret); 1623 } 1624 }); 1625 }); 1626 return future; 1627 } 1628 1629 @Override 1630 public CompletableFuture<Void> move(byte[] regionName) { 1631 CompletableFuture<Void> future = new CompletableFuture<>(); 1632 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1633 if (err != null) { 1634 future.completeExceptionally(err); 1635 return; 1636 } 1637 addListener( 1638 moveRegion(regionInfo, 1639 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1640 (ret, err2) -> { 1641 if (err2 != null) { 1642 future.completeExceptionally(err2); 1643 } else { 1644 future.complete(ret); 1645 } 1646 }); 1647 }); 1648 return future; 1649 } 1650 1651 @Override 1652 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1653 Preconditions.checkNotNull(destServerName, 1654 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1655 CompletableFuture<Void> future = new CompletableFuture<>(); 1656 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1657 if (err != null) { 1658 future.completeExceptionally(err); 1659 return; 1660 } 1661 addListener( 1662 moveRegion(regionInfo, RequestConverter 1663 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1664 (ret, err2) -> { 1665 if (err2 != null) { 1666 future.completeExceptionally(err2); 1667 } else { 1668 future.complete(ret); 1669 } 1670 }); 1671 }); 1672 return future; 1673 } 1674 1675 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1676 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1677 .action( 1678 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1679 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1680 .call(); 1681 } 1682 1683 @Override 1684 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1685 return this.<Void> newMasterCaller() 1686 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1687 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1688 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1689 .call(); 1690 } 1691 1692 @Override 1693 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1694 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1695 Scan scan = QuotaTableUtil.makeScan(filter); 1696 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1697 new AdvancedScanResultConsumer() { 1698 List<QuotaSettings> settings = new ArrayList<>(); 1699 1700 @Override 1701 public void onNext(Result[] results, ScanController controller) { 1702 for (Result result : results) { 1703 try { 1704 QuotaTableUtil.parseResultToCollection(result, settings); 1705 } catch (IOException e) { 1706 controller.terminate(); 1707 future.completeExceptionally(e); 1708 } 1709 } 1710 } 1711 1712 @Override 1713 public void onError(Throwable error) { 1714 future.completeExceptionally(error); 1715 } 1716 1717 @Override 1718 public void onComplete() { 1719 future.complete(settings); 1720 } 1721 }); 1722 return future; 1723 } 1724 1725 @Override 1726 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1727 boolean enabled) { 1728 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1729 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1730 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1731 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1732 } 1733 1734 @Override 1735 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1736 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1737 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1738 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1739 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1740 } 1741 1742 @Override 1743 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1744 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1745 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1746 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1747 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1748 } 1749 1750 @Override 1751 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1752 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1753 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1754 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1755 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1756 } 1757 1758 @Override 1759 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1760 return this.<ReplicationPeerConfig> newMasterCaller() 1761 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1762 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1763 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1764 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1765 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1766 .call(); 1767 } 1768 1769 @Override 1770 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1771 ReplicationPeerConfig peerConfig) { 1772 return this.<UpdateReplicationPeerConfigRequest, 1773 UpdateReplicationPeerConfigResponse> procedureCall( 1774 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1775 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1776 (resp) -> resp.getProcId(), 1777 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1778 } 1779 1780 @Override 1781 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1782 SyncReplicationState clusterState) { 1783 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1784 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1785 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1786 clusterState), 1787 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1788 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1789 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1790 } 1791 1792 @Override 1793 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1794 Map<TableName, List<String>> tableCfs) { 1795 if (tableCfs == null) { 1796 return failedFuture(new ReplicationException("tableCfs is null")); 1797 } 1798 1799 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1800 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1801 if (!completeExceptionally(future, error)) { 1802 ReplicationPeerConfig newPeerConfig = 1803 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1804 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1805 if (!completeExceptionally(future, error)) { 1806 future.complete(result); 1807 } 1808 }); 1809 } 1810 }); 1811 return future; 1812 } 1813 1814 @Override 1815 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1816 Map<TableName, List<String>> tableCfs) { 1817 if (tableCfs == null) { 1818 return failedFuture(new ReplicationException("tableCfs is null")); 1819 } 1820 1821 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1822 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1823 if (!completeExceptionally(future, error)) { 1824 ReplicationPeerConfig newPeerConfig = null; 1825 try { 1826 newPeerConfig = ReplicationPeerConfigUtil 1827 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1828 } catch (ReplicationException e) { 1829 future.completeExceptionally(e); 1830 return; 1831 } 1832 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1833 if (!completeExceptionally(future, error)) { 1834 future.complete(result); 1835 } 1836 }); 1837 } 1838 }); 1839 return future; 1840 } 1841 1842 @Override 1843 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1844 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1845 } 1846 1847 @Override 1848 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1849 Preconditions.checkNotNull(pattern, 1850 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1851 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1852 } 1853 1854 private CompletableFuture<List<ReplicationPeerDescription>> 1855 listReplicationPeers(ListReplicationPeersRequest request) { 1856 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1857 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1858 List<ReplicationPeerDescription>> call(controller, stub, request, 1859 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1860 (resp) -> resp.getPeerDescList().stream() 1861 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1862 .collect(Collectors.toList()))) 1863 .call(); 1864 } 1865 1866 @Override 1867 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1868 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1869 addListener(listTableDescriptors(), (tables, error) -> { 1870 if (!completeExceptionally(future, error)) { 1871 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1872 tables.forEach(table -> { 1873 Map<String, Integer> cfs = new HashMap<>(); 1874 Stream.of(table.getColumnFamilies()) 1875 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1876 .forEach(column -> { 1877 cfs.put(column.getNameAsString(), column.getScope()); 1878 }); 1879 if (!cfs.isEmpty()) { 1880 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1881 } 1882 }); 1883 future.complete(replicatedTableCFs); 1884 } 1885 }); 1886 return future; 1887 } 1888 1889 @Override 1890 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1891 SnapshotProtos.SnapshotDescription snapshot = 1892 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1893 try { 1894 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1895 } catch (IllegalArgumentException e) { 1896 return failedFuture(e); 1897 } 1898 CompletableFuture<Void> future = new CompletableFuture<>(); 1899 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 1900 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 1901 addListener(this.<SnapshotResponse> newMasterCaller() 1902 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 1903 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 1904 .call(), (resp, err) -> { 1905 if (err != null) { 1906 future.completeExceptionally(err); 1907 return; 1908 } 1909 waitSnapshotFinish(snapshotDesc, future, resp); 1910 }); 1911 return future; 1912 } 1913 1914 // This is for keeping compatibility with old implementation. 1915 // If there is a procId field in the response, then the snapshot will be operated with a 1916 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 1917 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 1918 SnapshotResponse resp) { 1919 if (resp.hasProcId()) { 1920 getProcedureResult(resp.getProcId(), future, 0); 1921 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 1922 } else { 1923 long expectedTimeout = resp.getExpectedTimeout(); 1924 TimerTask pollingTask = new TimerTask() { 1925 int tries = 0; 1926 long startTime = EnvironmentEdgeManager.currentTime(); 1927 long endTime = startTime + expectedTimeout; 1928 long maxPauseTime = expectedTimeout / maxAttempts; 1929 1930 @Override 1931 public void run(Timeout timeout) throws Exception { 1932 if (EnvironmentEdgeManager.currentTime() < endTime) { 1933 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 1934 if (err2 != null) { 1935 future.completeExceptionally(err2); 1936 } else if (done) { 1937 future.complete(null); 1938 } else { 1939 // retry again after pauseTime. 1940 long pauseTime = 1941 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1942 pauseTime = Math.min(pauseTime, maxPauseTime); 1943 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 1944 } 1945 }); 1946 } else { 1947 future 1948 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 1949 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 1950 } 1951 } 1952 }; 1953 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1954 } 1955 } 1956 1957 @Override 1958 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1959 return this.<Boolean> newMasterCaller() 1960 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 1961 Boolean> call(controller, stub, 1962 IsSnapshotDoneRequest.newBuilder() 1963 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 1964 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 1965 .call(); 1966 } 1967 1968 @Override 1969 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1970 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1971 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1972 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1973 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1974 } 1975 1976 @Override 1977 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 1978 boolean restoreAcl) { 1979 CompletableFuture<Void> future = new CompletableFuture<>(); 1980 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 1981 if (err != null) { 1982 future.completeExceptionally(err); 1983 return; 1984 } 1985 TableName tableName = null; 1986 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 1987 for (SnapshotDescription snap : snapshotDescriptions) { 1988 if (snap.getName().equals(snapshotName)) { 1989 tableName = snap.getTableName(); 1990 break; 1991 } 1992 } 1993 } 1994 if (tableName == null) { 1995 future.completeExceptionally(new RestoreSnapshotException( 1996 "Unable to find the table name for snapshot=" + snapshotName)); 1997 return; 1998 } 1999 final TableName finalTableName = tableName; 2000 addListener(tableExists(finalTableName), (exists, err2) -> { 2001 if (err2 != null) { 2002 future.completeExceptionally(err2); 2003 } else if (!exists) { 2004 // if table does not exist, then just clone snapshot into new table. 2005 completeConditionalOnFuture(future, 2006 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2007 } else { 2008 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2009 if (err4 != null) { 2010 future.completeExceptionally(err4); 2011 } else if (!disabled) { 2012 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2013 } else { 2014 completeConditionalOnFuture(future, 2015 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2016 } 2017 }); 2018 } 2019 }); 2020 }); 2021 return future; 2022 } 2023 2024 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2025 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2026 if (takeFailSafeSnapshot) { 2027 CompletableFuture<Void> future = new CompletableFuture<>(); 2028 // Step.1 Take a snapshot of the current state 2029 String failSafeSnapshotSnapshotNameFormat = 2030 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2031 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2032 final String failSafeSnapshotSnapshotName = 2033 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2034 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2035 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2036 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2037 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2038 if (err != null) { 2039 future.completeExceptionally(err); 2040 } else { 2041 // Step.2 Restore snapshot 2042 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2043 (void2, err2) -> { 2044 if (err2 != null) { 2045 // Step.3.a Something went wrong during the restore and try to rollback. 2046 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2047 restoreAcl, null), (void3, err3) -> { 2048 if (err3 != null) { 2049 future.completeExceptionally(err3); 2050 } else { 2051 String msg = 2052 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" 2053 + failSafeSnapshotSnapshotName + " succeeded."; 2054 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2055 } 2056 }); 2057 } else { 2058 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2059 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2060 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2061 if (err3 != null) { 2062 LOG.error( 2063 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2064 err3); 2065 } 2066 future.complete(ret3); 2067 }); 2068 } 2069 }); 2070 } 2071 }); 2072 return future; 2073 } else { 2074 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2075 } 2076 } 2077 2078 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2079 CompletableFuture<T> parentFuture) { 2080 addListener(parentFuture, (res, err) -> { 2081 if (err != null) { 2082 dependentFuture.completeExceptionally(err); 2083 } else { 2084 dependentFuture.complete(res); 2085 } 2086 }); 2087 } 2088 2089 @Override 2090 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2091 boolean restoreAcl, String customSFT) { 2092 CompletableFuture<Void> future = new CompletableFuture<>(); 2093 addListener(tableExists(tableName), (exists, err) -> { 2094 if (err != null) { 2095 future.completeExceptionally(err); 2096 } else if (exists) { 2097 future.completeExceptionally(new TableExistsException(tableName)); 2098 } else { 2099 completeConditionalOnFuture(future, 2100 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2101 } 2102 }); 2103 return future; 2104 } 2105 2106 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2107 boolean restoreAcl, String customSFT) { 2108 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2109 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2110 try { 2111 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2112 } catch (IllegalArgumentException e) { 2113 return failedFuture(e); 2114 } 2115 RestoreSnapshotRequest.Builder builder = 2116 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2117 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2118 if (customSFT != null) { 2119 builder.setCustomSFT(customSFT); 2120 } 2121 return waitProcedureResult(this.<Long> newMasterCaller() 2122 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2123 Long> call(controller, stub, builder.build(), 2124 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2125 .call()); 2126 } 2127 2128 @Override 2129 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2130 return getCompletedSnapshots(null); 2131 } 2132 2133 @Override 2134 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2135 Preconditions.checkNotNull(pattern, 2136 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2137 return getCompletedSnapshots(pattern); 2138 } 2139 2140 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2141 return this.<List<SnapshotDescription>> newMasterCaller() 2142 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2143 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2144 GetCompletedSnapshotsRequest.newBuilder().build(), 2145 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2146 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2147 .call(); 2148 } 2149 2150 @Override 2151 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2152 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2153 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2154 return getCompletedSnapshots(tableNamePattern, null); 2155 } 2156 2157 @Override 2158 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2159 Pattern snapshotNamePattern) { 2160 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2161 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2162 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2163 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2164 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2165 } 2166 2167 private CompletableFuture<List<SnapshotDescription>> 2168 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2169 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2170 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2171 if (err != null) { 2172 future.completeExceptionally(err); 2173 return; 2174 } 2175 if (tableNames == null || tableNames.size() <= 0) { 2176 future.complete(Collections.emptyList()); 2177 return; 2178 } 2179 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2180 if (err2 != null) { 2181 future.completeExceptionally(err2); 2182 return; 2183 } 2184 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2185 future.complete(Collections.emptyList()); 2186 return; 2187 } 2188 future.complete(snapshotDescList.stream() 2189 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2190 .collect(Collectors.toList())); 2191 }); 2192 }); 2193 return future; 2194 } 2195 2196 @Override 2197 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2198 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2199 } 2200 2201 @Override 2202 public CompletableFuture<Void> deleteSnapshots() { 2203 return internalDeleteSnapshots(null, null); 2204 } 2205 2206 @Override 2207 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2208 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2209 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2210 return internalDeleteSnapshots(null, snapshotNamePattern); 2211 } 2212 2213 @Override 2214 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2215 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2216 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2217 return internalDeleteSnapshots(tableNamePattern, null); 2218 } 2219 2220 @Override 2221 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2222 Pattern snapshotNamePattern) { 2223 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2224 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2225 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2226 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2227 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2228 } 2229 2230 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2231 Pattern snapshotNamePattern) { 2232 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2233 if (tableNamePattern == null) { 2234 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2235 } else { 2236 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2237 } 2238 CompletableFuture<Void> future = new CompletableFuture<>(); 2239 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> { 2240 if (err != null) { 2241 future.completeExceptionally(err); 2242 return; 2243 } 2244 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2245 future.complete(null); 2246 return; 2247 } 2248 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2249 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2250 if (e != null) { 2251 future.completeExceptionally(e); 2252 } else { 2253 future.complete(v); 2254 } 2255 }); 2256 })); 2257 return future; 2258 } 2259 2260 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2261 return this.<Void> newMasterCaller() 2262 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2263 controller, stub, 2264 DeleteSnapshotRequest.newBuilder() 2265 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2266 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2267 .call(); 2268 } 2269 2270 @Override 2271 public CompletableFuture<Void> execProcedure(String signature, String instance, 2272 Map<String, String> props) { 2273 CompletableFuture<Void> future = new CompletableFuture<>(); 2274 ProcedureDescription procDesc = 2275 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2276 addListener(this.<Long> newMasterCaller() 2277 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2278 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2279 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2280 .call(), (expectedTimeout, err) -> { 2281 if (err != null) { 2282 future.completeExceptionally(err); 2283 return; 2284 } 2285 TimerTask pollingTask = new TimerTask() { 2286 int tries = 0; 2287 long startTime = EnvironmentEdgeManager.currentTime(); 2288 long endTime = startTime + expectedTimeout; 2289 long maxPauseTime = expectedTimeout / maxAttempts; 2290 2291 @Override 2292 public void run(Timeout timeout) throws Exception { 2293 if (EnvironmentEdgeManager.currentTime() < endTime) { 2294 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2295 if (err2 != null) { 2296 future.completeExceptionally(err2); 2297 return; 2298 } 2299 if (done) { 2300 future.complete(null); 2301 } else { 2302 // retry again after pauseTime. 2303 long pauseTime = 2304 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2305 pauseTime = Math.min(pauseTime, maxPauseTime); 2306 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2307 TimeUnit.MICROSECONDS); 2308 } 2309 }); 2310 } else { 2311 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2312 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2313 } 2314 } 2315 }; 2316 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2317 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2318 }); 2319 return future; 2320 } 2321 2322 @Override 2323 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2324 Map<String, String> props) { 2325 ProcedureDescription proDesc = 2326 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2327 return this.<byte[]> newMasterCaller() 2328 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2329 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2330 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2331 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2332 .call(); 2333 } 2334 2335 @Override 2336 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2337 Map<String, String> props) { 2338 ProcedureDescription proDesc = 2339 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2340 return this.<Boolean> newMasterCaller() 2341 .action( 2342 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2343 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2344 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2345 .call(); 2346 } 2347 2348 @Override 2349 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2350 return this.<Boolean> newMasterCaller().action( 2351 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2352 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2353 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2354 .call(); 2355 } 2356 2357 @Override 2358 public CompletableFuture<String> getProcedures() { 2359 return this.<String> newMasterCaller() 2360 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2361 controller, stub, GetProceduresRequest.newBuilder().build(), 2362 (s, c, req, done) -> s.getProcedures(c, req, done), 2363 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2364 .call(); 2365 } 2366 2367 @Override 2368 public CompletableFuture<String> getLocks() { 2369 return this.<String> newMasterCaller() 2370 .action( 2371 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2372 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2373 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2374 .call(); 2375 } 2376 2377 @Override 2378 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2379 boolean offload) { 2380 return this.<Void> newMasterCaller() 2381 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2382 DecommissionRegionServersResponse, Void> call(controller, stub, 2383 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2384 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2385 .call(); 2386 } 2387 2388 @Override 2389 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2390 return this.<List<ServerName>> newMasterCaller() 2391 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2392 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2393 ListDecommissionedRegionServersRequest.newBuilder().build(), 2394 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2395 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2396 .collect(Collectors.toList()))) 2397 .call(); 2398 } 2399 2400 @Override 2401 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2402 List<byte[]> encodedRegionNames) { 2403 return this.<Void> newMasterCaller() 2404 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2405 RecommissionRegionServerResponse, Void> call(controller, stub, 2406 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2407 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2408 .call(); 2409 } 2410 2411 /** 2412 * Get the region location for the passed region name. The region name may be a full region name 2413 * or encoded region name. If the region does not found, then it'll throw an 2414 * UnknownRegionException wrapped by a {@link CompletableFuture} 2415 * @param regionNameOrEncodedRegionName region name or encoded region name 2416 * @return region location, wrapped by a {@link CompletableFuture} 2417 */ 2418 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2419 if (regionNameOrEncodedRegionName == null) { 2420 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2421 } 2422 2423 CompletableFuture<Optional<HRegionLocation>> future; 2424 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2425 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2426 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2427 // old format encodedName, should be meta region 2428 future = connection.registry.getMetaRegionLocations() 2429 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2430 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2431 } else { 2432 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2433 regionNameOrEncodedRegionName); 2434 } 2435 } else { 2436 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2437 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2438 RegionInfo regionInfo; 2439 try { 2440 regionInfo = 2441 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2442 } catch (IOException ioe) { 2443 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2444 } 2445 2446 if (regionInfo.isMetaRegion()) { 2447 future = connection.registry.getMetaRegionLocations() 2448 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2449 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2450 .findFirst()); 2451 } else { 2452 future = 2453 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2454 } 2455 } 2456 2457 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2458 addListener(future, (location, err) -> { 2459 if (err != null) { 2460 returnedFuture.completeExceptionally(err); 2461 return; 2462 } 2463 if (!location.isPresent() || location.get().getRegion() == null) { 2464 returnedFuture.completeExceptionally( 2465 new UnknownRegionException("Invalid region name or encoded region name: " 2466 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2467 } else { 2468 returnedFuture.complete(location.get()); 2469 } 2470 }); 2471 return returnedFuture; 2472 } 2473 2474 /** 2475 * Get the region info for the passed region name. The region name may be a full region name or 2476 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2477 * wrapped by a {@link CompletableFuture} 2478 * @return region info, wrapped by a {@link CompletableFuture} 2479 */ 2480 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2481 if (regionNameOrEncodedRegionName == null) { 2482 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2483 } 2484 2485 if ( 2486 Bytes.equals(regionNameOrEncodedRegionName, 2487 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2488 || Bytes.equals(regionNameOrEncodedRegionName, 2489 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2490 ) { 2491 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2492 } 2493 2494 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2495 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2496 if (err != null) { 2497 future.completeExceptionally(err); 2498 } else { 2499 future.complete(location.getRegion()); 2500 } 2501 }); 2502 return future; 2503 } 2504 2505 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2506 if (numRegions < 3) { 2507 throw new IllegalArgumentException("Must create at least three regions"); 2508 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2509 throw new IllegalArgumentException("Start key must be smaller than end key"); 2510 } 2511 if (numRegions == 3) { 2512 return new byte[][] { startKey, endKey }; 2513 } 2514 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2515 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2516 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2517 } 2518 return splitKeys; 2519 } 2520 2521 private void verifySplitKeys(byte[][] splitKeys) { 2522 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2523 // Verify there are no duplicate split keys 2524 byte[] lastKey = null; 2525 for (byte[] splitKey : splitKeys) { 2526 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2527 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2528 } 2529 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2530 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2531 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2532 } 2533 lastKey = splitKey; 2534 } 2535 } 2536 2537 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2538 2539 abstract void onFinished(); 2540 2541 abstract void onError(Throwable error); 2542 2543 @Override 2544 public void accept(Void v, Throwable error) { 2545 if (error != null) { 2546 onError(error); 2547 return; 2548 } 2549 onFinished(); 2550 } 2551 } 2552 2553 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2554 protected final TableName tableName; 2555 2556 TableProcedureBiConsumer(TableName tableName) { 2557 this.tableName = tableName; 2558 } 2559 2560 abstract String getOperationType(); 2561 2562 String getDescription() { 2563 return "Operation: " + getOperationType() + ", " + "Table Name: " 2564 + tableName.getNameWithNamespaceInclAsString(); 2565 } 2566 2567 @Override 2568 void onFinished() { 2569 LOG.info(getDescription() + " completed"); 2570 } 2571 2572 @Override 2573 void onError(Throwable error) { 2574 LOG.info(getDescription() + " failed with " + error.getMessage()); 2575 } 2576 } 2577 2578 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2579 protected final String namespaceName; 2580 2581 NamespaceProcedureBiConsumer(String namespaceName) { 2582 this.namespaceName = namespaceName; 2583 } 2584 2585 abstract String getOperationType(); 2586 2587 String getDescription() { 2588 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2589 } 2590 2591 @Override 2592 void onFinished() { 2593 LOG.info(getDescription() + " completed"); 2594 } 2595 2596 @Override 2597 void onError(Throwable error) { 2598 LOG.info(getDescription() + " failed with " + error.getMessage()); 2599 } 2600 } 2601 2602 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2603 2604 CreateTableProcedureBiConsumer(TableName tableName) { 2605 super(tableName); 2606 } 2607 2608 @Override 2609 String getOperationType() { 2610 return "CREATE"; 2611 } 2612 } 2613 2614 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2615 2616 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2617 super(tableName); 2618 } 2619 2620 @Override 2621 String getOperationType() { 2622 return "MODIFY"; 2623 } 2624 } 2625 2626 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2627 extends TableProcedureBiConsumer { 2628 2629 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2630 super(tableName); 2631 } 2632 2633 @Override 2634 String getOperationType() { 2635 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2636 } 2637 } 2638 2639 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2640 2641 DeleteTableProcedureBiConsumer(TableName tableName) { 2642 super(tableName); 2643 } 2644 2645 @Override 2646 String getOperationType() { 2647 return "DELETE"; 2648 } 2649 2650 @Override 2651 void onFinished() { 2652 connection.getLocator().clearCache(this.tableName); 2653 super.onFinished(); 2654 } 2655 } 2656 2657 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2658 2659 TruncateTableProcedureBiConsumer(TableName tableName) { 2660 super(tableName); 2661 } 2662 2663 @Override 2664 String getOperationType() { 2665 return "TRUNCATE"; 2666 } 2667 } 2668 2669 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2670 2671 EnableTableProcedureBiConsumer(TableName tableName) { 2672 super(tableName); 2673 } 2674 2675 @Override 2676 String getOperationType() { 2677 return "ENABLE"; 2678 } 2679 } 2680 2681 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2682 2683 DisableTableProcedureBiConsumer(TableName tableName) { 2684 super(tableName); 2685 } 2686 2687 @Override 2688 String getOperationType() { 2689 return "DISABLE"; 2690 } 2691 } 2692 2693 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2694 2695 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2696 super(tableName); 2697 } 2698 2699 @Override 2700 String getOperationType() { 2701 return "ADD_COLUMN_FAMILY"; 2702 } 2703 } 2704 2705 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2706 2707 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2708 super(tableName); 2709 } 2710 2711 @Override 2712 String getOperationType() { 2713 return "DELETE_COLUMN_FAMILY"; 2714 } 2715 } 2716 2717 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2718 2719 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2720 super(tableName); 2721 } 2722 2723 @Override 2724 String getOperationType() { 2725 return "MODIFY_COLUMN_FAMILY"; 2726 } 2727 } 2728 2729 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2730 extends TableProcedureBiConsumer { 2731 2732 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2733 super(tableName); 2734 } 2735 2736 @Override 2737 String getOperationType() { 2738 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2739 } 2740 } 2741 2742 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2743 2744 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2745 super(namespaceName); 2746 } 2747 2748 @Override 2749 String getOperationType() { 2750 return "CREATE_NAMESPACE"; 2751 } 2752 } 2753 2754 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2755 2756 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2757 super(namespaceName); 2758 } 2759 2760 @Override 2761 String getOperationType() { 2762 return "DELETE_NAMESPACE"; 2763 } 2764 } 2765 2766 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2767 2768 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2769 super(namespaceName); 2770 } 2771 2772 @Override 2773 String getOperationType() { 2774 return "MODIFY_NAMESPACE"; 2775 } 2776 } 2777 2778 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2779 2780 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2781 super(tableName); 2782 } 2783 2784 @Override 2785 String getOperationType() { 2786 return "MERGE_REGIONS"; 2787 } 2788 } 2789 2790 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2791 2792 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2793 super(tableName); 2794 } 2795 2796 @Override 2797 String getOperationType() { 2798 return "SPLIT_REGION"; 2799 } 2800 } 2801 2802 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2803 SnapshotProcedureBiConsumer(TableName tableName) { 2804 super(tableName); 2805 } 2806 2807 @Override 2808 String getOperationType() { 2809 return "SNAPSHOT"; 2810 } 2811 } 2812 2813 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2814 private final String peerId; 2815 private final Supplier<String> getOperation; 2816 2817 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2818 this.peerId = peerId; 2819 this.getOperation = getOperation; 2820 } 2821 2822 String getDescription() { 2823 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2824 } 2825 2826 @Override 2827 void onFinished() { 2828 LOG.info(getDescription() + " completed"); 2829 } 2830 2831 @Override 2832 void onError(Throwable error) { 2833 LOG.info(getDescription() + " failed with " + error.getMessage()); 2834 } 2835 } 2836 2837 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2838 CompletableFuture<Void> future = new CompletableFuture<>(); 2839 addListener(procFuture, (procId, error) -> { 2840 if (error != null) { 2841 future.completeExceptionally(error); 2842 return; 2843 } 2844 getProcedureResult(procId, future, 0); 2845 }); 2846 return future; 2847 } 2848 2849 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2850 addListener( 2851 this.<GetProcedureResultResponse> newMasterCaller() 2852 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 2853 GetProcedureResultResponse> call(controller, stub, 2854 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2855 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2856 .call(), 2857 (response, error) -> { 2858 if (error != null) { 2859 LOG.warn("failed to get the procedure result procId={}", procId, 2860 ConnectionUtils.translateException(error)); 2861 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2862 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2863 return; 2864 } 2865 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2866 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2867 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2868 return; 2869 } 2870 if (response.hasException()) { 2871 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2872 future.completeExceptionally(ioe); 2873 } else { 2874 future.complete(null); 2875 } 2876 }); 2877 } 2878 2879 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2880 CompletableFuture<T> future = new CompletableFuture<>(); 2881 future.completeExceptionally(error); 2882 return future; 2883 } 2884 2885 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2886 if (error != null) { 2887 future.completeExceptionally(error); 2888 return true; 2889 } 2890 return false; 2891 } 2892 2893 @Override 2894 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2895 return getClusterMetrics(EnumSet.allOf(Option.class)); 2896 } 2897 2898 @Override 2899 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2900 return this.<ClusterMetrics> newMasterCaller() 2901 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 2902 ClusterMetrics> call(controller, stub, 2903 RequestConverter.buildGetClusterStatusRequest(options), 2904 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2905 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 2906 .call(); 2907 } 2908 2909 @Override 2910 public CompletableFuture<Void> shutdown() { 2911 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2912 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2913 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 2914 resp -> null)) 2915 .call(); 2916 } 2917 2918 @Override 2919 public CompletableFuture<Void> stopMaster() { 2920 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2921 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 2922 controller, stub, StopMasterRequest.newBuilder().build(), 2923 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 2924 .call(); 2925 } 2926 2927 @Override 2928 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2929 StopServerRequest request = RequestConverter 2930 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 2931 return this.<Void> newAdminCaller().priority(HIGH_QOS) 2932 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2933 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2934 resp -> null)) 2935 .serverName(serverName).call(); 2936 } 2937 2938 @Override 2939 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2940 return this.<Void> newAdminCaller() 2941 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 2942 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2943 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2944 .serverName(serverName).call(); 2945 } 2946 2947 @Override 2948 public CompletableFuture<Void> updateConfiguration() { 2949 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2950 addListener( 2951 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 2952 (status, err) -> { 2953 if (err != null) { 2954 future.completeExceptionally(err); 2955 } else { 2956 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2957 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 2958 futures.add(updateConfiguration(status.getMasterName())); 2959 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2960 addListener( 2961 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2962 (result, err2) -> { 2963 if (err2 != null) { 2964 future.completeExceptionally(err2); 2965 } else { 2966 future.complete(result); 2967 } 2968 }); 2969 } 2970 }); 2971 return future; 2972 } 2973 2974 @Override 2975 public CompletableFuture<Void> updateConfiguration(String groupName) { 2976 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2977 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 2978 if (err != null) { 2979 future.completeExceptionally(err); 2980 } else if (rsGroupInfo == null) { 2981 future.completeExceptionally( 2982 new IllegalArgumentException("Group does not exist: " + groupName)); 2983 } else { 2984 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 2985 if (err2 != null) { 2986 future.completeExceptionally(err2); 2987 } else { 2988 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2989 List<ServerName> groupServers = status.getServersName().stream() 2990 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 2991 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 2992 addListener( 2993 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2994 (result, err3) -> { 2995 if (err3 != null) { 2996 future.completeExceptionally(err3); 2997 } else { 2998 future.complete(result); 2999 } 3000 }); 3001 } 3002 }); 3003 } 3004 }); 3005 return future; 3006 } 3007 3008 @Override 3009 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3010 return this.<Void> newAdminCaller() 3011 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3012 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3013 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3014 .serverName(serverName).call(); 3015 } 3016 3017 @Override 3018 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3019 return this.<Void> newAdminCaller() 3020 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3021 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3022 RequestConverter.buildClearCompactionQueuesRequest(queues), 3023 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3024 .serverName(serverName).call(); 3025 } 3026 3027 @Override 3028 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3029 return this.<List<SecurityCapability>> newMasterCaller() 3030 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3031 List<SecurityCapability>> call(controller, stub, 3032 SecurityCapabilitiesRequest.newBuilder().build(), 3033 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3034 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3035 .call(); 3036 } 3037 3038 @Override 3039 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3040 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3041 } 3042 3043 @Override 3044 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3045 TableName tableName) { 3046 Preconditions.checkNotNull(tableName, 3047 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3048 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3049 } 3050 3051 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3052 ServerName serverName) { 3053 return this.<List<RegionMetrics>> newAdminCaller() 3054 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3055 List<RegionMetrics>> adminCall(controller, stub, request, 3056 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3057 RegionMetricsBuilder::toRegionMetrics)) 3058 .serverName(serverName).call(); 3059 } 3060 3061 @Override 3062 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3063 return this.<Boolean> newMasterCaller() 3064 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3065 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3066 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3067 resp -> resp.getInMaintenanceMode())) 3068 .call(); 3069 } 3070 3071 @Override 3072 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3073 CompactType compactType) { 3074 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3075 3076 switch (compactType) { 3077 case MOB: 3078 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3079 if (err != null) { 3080 future.completeExceptionally(err); 3081 return; 3082 } 3083 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3084 3085 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3086 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3087 GetRegionInfoResponse> adminCall(controller, stub, 3088 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3089 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3090 .call(), (resp2, err2) -> { 3091 if (err2 != null) { 3092 future.completeExceptionally(err2); 3093 } else { 3094 if (resp2.hasCompactionState()) { 3095 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3096 } else { 3097 future.complete(CompactionState.NONE); 3098 } 3099 } 3100 }); 3101 }); 3102 break; 3103 case NORMAL: 3104 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3105 if (err != null) { 3106 future.completeExceptionally(err); 3107 return; 3108 } 3109 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3110 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3111 locations.stream().filter(loc -> loc.getServerName() != null) 3112 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3113 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3114 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3115 // If any region compaction state is MAJOR_AND_MINOR 3116 // the table compaction state is MAJOR_AND_MINOR, too. 3117 if (err2 != null) { 3118 future.completeExceptionally(unwrapCompletionException(err2)); 3119 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3120 future.complete(regionState); 3121 } else { 3122 regionStates.add(regionState); 3123 } 3124 })); 3125 }); 3126 addListener( 3127 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3128 (ret, err3) -> { 3129 // If future not completed, check all regions's compaction state 3130 if (!future.isCompletedExceptionally() && !future.isDone()) { 3131 CompactionState state = CompactionState.NONE; 3132 for (CompactionState regionState : regionStates) { 3133 switch (regionState) { 3134 case MAJOR: 3135 if (state == CompactionState.MINOR) { 3136 future.complete(CompactionState.MAJOR_AND_MINOR); 3137 } else { 3138 state = CompactionState.MAJOR; 3139 } 3140 break; 3141 case MINOR: 3142 if (state == CompactionState.MAJOR) { 3143 future.complete(CompactionState.MAJOR_AND_MINOR); 3144 } else { 3145 state = CompactionState.MINOR; 3146 } 3147 break; 3148 case NONE: 3149 default: 3150 } 3151 } 3152 if (!future.isDone()) { 3153 future.complete(state); 3154 } 3155 } 3156 }); 3157 }); 3158 break; 3159 default: 3160 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3161 } 3162 3163 return future; 3164 } 3165 3166 @Override 3167 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3168 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3169 addListener(getRegionLocation(regionName), (location, err) -> { 3170 if (err != null) { 3171 future.completeExceptionally(err); 3172 return; 3173 } 3174 ServerName serverName = location.getServerName(); 3175 if (serverName == null) { 3176 future 3177 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3178 return; 3179 } 3180 addListener( 3181 this.<GetRegionInfoResponse> newAdminCaller() 3182 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3183 GetRegionInfoResponse> adminCall(controller, stub, 3184 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3185 true), 3186 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3187 .serverName(serverName).call(), 3188 (resp2, err2) -> { 3189 if (err2 != null) { 3190 future.completeExceptionally(err2); 3191 } else { 3192 if (resp2.hasCompactionState()) { 3193 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3194 } else { 3195 future.complete(CompactionState.NONE); 3196 } 3197 } 3198 }); 3199 }); 3200 return future; 3201 } 3202 3203 @Override 3204 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3205 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3206 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3207 return this.<Optional<Long>> newMasterCaller() 3208 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3209 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3210 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3211 ProtobufUtil::toOptionalTimestamp)) 3212 .call(); 3213 } 3214 3215 @Override 3216 public CompletableFuture<Optional<Long>> 3217 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3218 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3219 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3220 addListener(getRegionInfo(regionName), (region, err) -> { 3221 if (err != null) { 3222 future.completeExceptionally(err); 3223 return; 3224 } 3225 MajorCompactionTimestampForRegionRequest.Builder builder = 3226 MajorCompactionTimestampForRegionRequest.newBuilder(); 3227 builder.setRegion( 3228 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3229 addListener(this.<Optional<Long>> newMasterCaller() 3230 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3231 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3232 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3233 ProtobufUtil::toOptionalTimestamp)) 3234 .call(), (timestamp, err2) -> { 3235 if (err2 != null) { 3236 future.completeExceptionally(err2); 3237 } else { 3238 future.complete(timestamp); 3239 } 3240 }); 3241 }); 3242 return future; 3243 } 3244 3245 @Override 3246 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3247 List<String> serverNamesList) { 3248 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3249 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3250 if (err != null) { 3251 future.completeExceptionally(err); 3252 return; 3253 } 3254 // Accessed by multiple threads. 3255 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3256 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3257 serverNames.stream().forEach(serverName -> { 3258 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3259 if (err2 != null) { 3260 future.completeExceptionally(unwrapCompletionException(err2)); 3261 } else { 3262 serverStates.put(serverName, serverState); 3263 } 3264 })); 3265 }); 3266 addListener( 3267 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3268 (ret, err3) -> { 3269 if (!future.isCompletedExceptionally()) { 3270 if (err3 != null) { 3271 future.completeExceptionally(err3); 3272 } else { 3273 future.complete(serverStates); 3274 } 3275 } 3276 }); 3277 }); 3278 return future; 3279 } 3280 3281 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3282 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3283 if (serverNamesList.isEmpty()) { 3284 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3285 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3286 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3287 if (err != null) { 3288 future.completeExceptionally(err); 3289 } else { 3290 future.complete(clusterMetrics.getServersName()); 3291 } 3292 }); 3293 return future; 3294 } else { 3295 List<ServerName> serverList = new ArrayList<>(); 3296 for (String regionServerName : serverNamesList) { 3297 ServerName serverName = null; 3298 try { 3299 serverName = ServerName.valueOf(regionServerName); 3300 } catch (Exception e) { 3301 future.completeExceptionally( 3302 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3303 } 3304 if (serverName == null) { 3305 future.completeExceptionally( 3306 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3307 } else { 3308 serverList.add(serverName); 3309 } 3310 } 3311 future.complete(serverList); 3312 } 3313 return future; 3314 } 3315 3316 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3317 return this.<Boolean> newAdminCaller().serverName(serverName) 3318 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3319 Boolean> adminCall(controller, stub, 3320 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3321 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3322 .call(); 3323 } 3324 3325 @Override 3326 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3327 return this.<Boolean> newMasterCaller() 3328 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3329 Boolean> call(controller, stub, 3330 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3331 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3332 (resp) -> resp.getPrevBalanceValue())) 3333 .call(); 3334 } 3335 3336 @Override 3337 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3338 return this.<BalanceResponse> newMasterCaller() 3339 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3340 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3341 (s, c, req, done) -> s.balance(c, req, done), 3342 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3343 .call(); 3344 } 3345 3346 @Override 3347 public CompletableFuture<Boolean> isBalancerEnabled() { 3348 return this.<Boolean> newMasterCaller() 3349 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3350 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3351 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3352 .call(); 3353 } 3354 3355 @Override 3356 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3357 return this.<Boolean> newMasterCaller() 3358 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3359 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3360 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3361 (resp) -> resp.getPrevNormalizerValue())) 3362 .call(); 3363 } 3364 3365 @Override 3366 public CompletableFuture<Boolean> isNormalizerEnabled() { 3367 return this.<Boolean> newMasterCaller() 3368 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3369 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3370 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3371 .call(); 3372 } 3373 3374 @Override 3375 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3376 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3377 } 3378 3379 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3380 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3381 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3382 } 3383 3384 @Override 3385 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3386 return this.<Boolean> newMasterCaller() 3387 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3388 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3389 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3390 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3391 (resp) -> resp.getPrevValue())) 3392 .call(); 3393 } 3394 3395 @Override 3396 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3397 return this.<Boolean> newMasterCaller() 3398 .action( 3399 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3400 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3401 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3402 .call(); 3403 } 3404 3405 @Override 3406 public CompletableFuture<Boolean> runCleanerChore() { 3407 return this.<Boolean> newMasterCaller() 3408 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3409 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3410 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3411 (resp) -> resp.getCleanerChoreRan())) 3412 .call(); 3413 } 3414 3415 @Override 3416 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3417 return this.<Boolean> newMasterCaller() 3418 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3419 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3420 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3421 .call(); 3422 } 3423 3424 @Override 3425 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3426 return this.<Boolean> newMasterCaller() 3427 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3428 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3429 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3430 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3431 .call(); 3432 } 3433 3434 @Override 3435 public CompletableFuture<Integer> runCatalogJanitor() { 3436 return this.<Integer> newMasterCaller() 3437 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3438 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3439 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3440 .call(); 3441 } 3442 3443 @Override 3444 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3445 ServiceCaller<S, R> callable) { 3446 MasterCoprocessorRpcChannelImpl channel = 3447 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3448 S stub = stubMaker.apply(channel); 3449 CompletableFuture<R> future = new CompletableFuture<>(); 3450 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3451 callable.call(stub, controller, resp -> { 3452 if (controller.failed()) { 3453 future.completeExceptionally(controller.getFailed()); 3454 } else { 3455 future.complete(resp); 3456 } 3457 }); 3458 return future; 3459 } 3460 3461 @Override 3462 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3463 ServiceCaller<S, R> callable, ServerName serverName) { 3464 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3465 this.<Message> newServerCaller().serverName(serverName)); 3466 S stub = stubMaker.apply(channel); 3467 CompletableFuture<R> future = new CompletableFuture<>(); 3468 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3469 callable.call(stub, controller, resp -> { 3470 if (controller.failed()) { 3471 future.completeExceptionally(controller.getFailed()); 3472 } else { 3473 future.complete(resp); 3474 } 3475 }); 3476 return future; 3477 } 3478 3479 @Override 3480 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3481 return this.<List<ServerName>> newMasterCaller() 3482 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3483 List<ServerName>> call(controller, stub, 3484 RequestConverter.buildClearDeadServersRequest(servers), 3485 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3486 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3487 .call(); 3488 } 3489 3490 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3491 return this.connection.callerFactory.<T> serverRequest() 3492 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3493 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3494 .pause(pauseNs, TimeUnit.NANOSECONDS) 3495 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3496 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3497 } 3498 3499 @Override 3500 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3501 if (tableName == null) { 3502 return failedFuture(new IllegalArgumentException("Table name is null")); 3503 } 3504 CompletableFuture<Void> future = new CompletableFuture<>(); 3505 addListener(tableExists(tableName), (exist, err) -> { 3506 if (err != null) { 3507 future.completeExceptionally(err); 3508 return; 3509 } 3510 if (!exist) { 3511 future.completeExceptionally(new TableNotFoundException( 3512 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3513 return; 3514 } 3515 addListener(getTableSplits(tableName), (splits, err1) -> { 3516 if (err1 != null) { 3517 future.completeExceptionally(err1); 3518 } else { 3519 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3520 if (err2 != null) { 3521 future.completeExceptionally(err2); 3522 } else { 3523 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3524 if (err3 != null) { 3525 future.completeExceptionally(err3); 3526 } else { 3527 future.complete(result3); 3528 } 3529 }); 3530 } 3531 }); 3532 } 3533 }); 3534 }); 3535 return future; 3536 } 3537 3538 @Override 3539 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3540 if (tableName == null) { 3541 return failedFuture(new IllegalArgumentException("Table name is null")); 3542 } 3543 CompletableFuture<Void> future = new CompletableFuture<>(); 3544 addListener(tableExists(tableName), (exist, err) -> { 3545 if (err != null) { 3546 future.completeExceptionally(err); 3547 return; 3548 } 3549 if (!exist) { 3550 future.completeExceptionally(new TableNotFoundException( 3551 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3552 return; 3553 } 3554 addListener(setTableReplication(tableName, false), (result, err2) -> { 3555 if (err2 != null) { 3556 future.completeExceptionally(err2); 3557 } else { 3558 future.complete(result); 3559 } 3560 }); 3561 }); 3562 return future; 3563 } 3564 3565 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3566 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3567 addListener( 3568 getRegions(tableName).thenApply(regions -> regions.stream() 3569 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3570 (regions, err2) -> { 3571 if (err2 != null) { 3572 future.completeExceptionally(err2); 3573 return; 3574 } 3575 if (regions.size() == 1) { 3576 future.complete(null); 3577 } else { 3578 byte[][] splits = new byte[regions.size() - 1][]; 3579 for (int i = 1; i < regions.size(); i++) { 3580 splits[i - 1] = regions.get(i).getStartKey(); 3581 } 3582 future.complete(splits); 3583 } 3584 }); 3585 return future; 3586 } 3587 3588 /** 3589 * Connect to peer and check the table descriptor on peer: 3590 * <ol> 3591 * <li>Create the same table on peer when not exist.</li> 3592 * <li>Throw an exception if the table already has replication enabled on any of the column 3593 * families.</li> 3594 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3595 * </ol> 3596 * @param tableName name of the table to sync to the peer 3597 * @param splits table split keys 3598 */ 3599 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3600 byte[][] splits) { 3601 CompletableFuture<Void> future = new CompletableFuture<>(); 3602 addListener(listReplicationPeers(), (peers, err) -> { 3603 if (err != null) { 3604 future.completeExceptionally(err); 3605 return; 3606 } 3607 if (peers == null || peers.size() <= 0) { 3608 future.completeExceptionally( 3609 new IllegalArgumentException("Found no peer cluster for replication.")); 3610 return; 3611 } 3612 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3613 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3614 .forEach(peer -> { 3615 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3616 }); 3617 addListener( 3618 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3619 (result, err2) -> { 3620 if (err2 != null) { 3621 future.completeExceptionally(err2); 3622 } else { 3623 future.complete(result); 3624 } 3625 }); 3626 }); 3627 return future; 3628 } 3629 3630 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3631 ReplicationPeerDescription peer) { 3632 Configuration peerConf = null; 3633 try { 3634 peerConf = 3635 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3636 } catch (IOException e) { 3637 return failedFuture(e); 3638 } 3639 CompletableFuture<Void> future = new CompletableFuture<>(); 3640 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3641 if (err != null) { 3642 future.completeExceptionally(err); 3643 return; 3644 } 3645 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3646 if (err1 != null) { 3647 future.completeExceptionally(err1); 3648 return; 3649 } 3650 AsyncAdmin peerAdmin = conn.getAdmin(); 3651 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3652 if (err2 != null) { 3653 future.completeExceptionally(err2); 3654 return; 3655 } 3656 if (!exist) { 3657 CompletableFuture<Void> createTableFuture = null; 3658 if (splits == null) { 3659 createTableFuture = peerAdmin.createTable(tableDesc); 3660 } else { 3661 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3662 } 3663 addListener(createTableFuture, (result, err3) -> { 3664 if (err3 != null) { 3665 future.completeExceptionally(err3); 3666 } else { 3667 future.complete(result); 3668 } 3669 }); 3670 } else { 3671 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3672 (result, err4) -> { 3673 if (err4 != null) { 3674 future.completeExceptionally(err4); 3675 } else { 3676 future.complete(result); 3677 } 3678 }); 3679 } 3680 }); 3681 }); 3682 }); 3683 return future; 3684 } 3685 3686 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3687 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3688 CompletableFuture<Void> future = new CompletableFuture<>(); 3689 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3690 if (err != null) { 3691 future.completeExceptionally(err); 3692 return; 3693 } 3694 if (peerTableDesc == null) { 3695 future.completeExceptionally( 3696 new IllegalArgumentException("Failed to get table descriptor for table " 3697 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3698 return; 3699 } 3700 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3701 future.completeExceptionally(new IllegalArgumentException( 3702 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3703 + ", but the table descriptors are not same when compared with source cluster." 3704 + " Thus can not enable the table's replication switch.")); 3705 return; 3706 } 3707 future.complete(null); 3708 }); 3709 return future; 3710 } 3711 3712 /** 3713 * Set the table's replication switch if the table's replication switch is already not set. 3714 * @param tableName name of the table 3715 * @param enableRep is replication switch enable or disable 3716 */ 3717 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3718 CompletableFuture<Void> future = new CompletableFuture<>(); 3719 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3720 if (err != null) { 3721 future.completeExceptionally(err); 3722 return; 3723 } 3724 if (!tableDesc.matchReplicationScope(enableRep)) { 3725 int scope = 3726 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3727 TableDescriptor newTableDesc = 3728 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3729 addListener(modifyTable(newTableDesc), (result, err2) -> { 3730 if (err2 != null) { 3731 future.completeExceptionally(err2); 3732 } else { 3733 future.complete(result); 3734 } 3735 }); 3736 } else { 3737 future.complete(null); 3738 } 3739 }); 3740 return future; 3741 } 3742 3743 @Override 3744 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3745 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3746 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3747 if (err != null) { 3748 future.completeExceptionally(err); 3749 return; 3750 } 3751 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3752 locations.stream().filter(l -> l.getRegion() != null) 3753 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3754 .collect(Collectors.groupingBy(l -> l.getServerName(), 3755 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3756 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3757 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3758 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3759 futures 3760 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3761 if (err2 != null) { 3762 future.completeExceptionally(unwrapCompletionException(err2)); 3763 } else { 3764 aggregator.append(stats); 3765 } 3766 })); 3767 } 3768 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3769 (ret, err3) -> { 3770 if (err3 != null) { 3771 future.completeExceptionally(unwrapCompletionException(err3)); 3772 } else { 3773 future.complete(aggregator.sum()); 3774 } 3775 }); 3776 }); 3777 return future; 3778 } 3779 3780 @Override 3781 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3782 boolean preserveSplits) { 3783 CompletableFuture<Void> future = new CompletableFuture<>(); 3784 addListener(tableExists(tableName), (exist, err) -> { 3785 if (err != null) { 3786 future.completeExceptionally(err); 3787 return; 3788 } 3789 if (!exist) { 3790 future.completeExceptionally(new TableNotFoundException(tableName)); 3791 return; 3792 } 3793 addListener(tableExists(newTableName), (exist1, err1) -> { 3794 if (err1 != null) { 3795 future.completeExceptionally(err1); 3796 return; 3797 } 3798 if (exist1) { 3799 future.completeExceptionally(new TableExistsException(newTableName)); 3800 return; 3801 } 3802 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3803 if (err2 != null) { 3804 future.completeExceptionally(err2); 3805 return; 3806 } 3807 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3808 if (preserveSplits) { 3809 addListener(getTableSplits(tableName), (splits, err3) -> { 3810 if (err3 != null) { 3811 future.completeExceptionally(err3); 3812 } else { 3813 addListener( 3814 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3815 (result, err4) -> { 3816 if (err4 != null) { 3817 future.completeExceptionally(err4); 3818 } else { 3819 future.complete(result); 3820 } 3821 }); 3822 } 3823 }); 3824 } else { 3825 addListener(createTable(newTableDesc), (result, err5) -> { 3826 if (err5 != null) { 3827 future.completeExceptionally(err5); 3828 } else { 3829 future.complete(result); 3830 } 3831 }); 3832 } 3833 }); 3834 }); 3835 }); 3836 return future; 3837 } 3838 3839 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3840 List<RegionInfo> hris) { 3841 return this.<CacheEvictionStats> newAdminCaller() 3842 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 3843 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 3844 RequestConverter.buildClearRegionBlockCacheRequest(hris), 3845 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3846 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3847 .serverName(serverName).call(); 3848 } 3849 3850 @Override 3851 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 3852 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3853 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 3854 Boolean> call(controller, stub, 3855 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 3856 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 3857 resp -> resp.getPreviousRpcThrottleEnabled())) 3858 .call(); 3859 return future; 3860 } 3861 3862 @Override 3863 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 3864 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3865 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 3866 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 3867 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 3868 resp -> resp.getRpcThrottleEnabled())) 3869 .call(); 3870 return future; 3871 } 3872 3873 @Override 3874 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 3875 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3876 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 3877 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 3878 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 3879 .build(), 3880 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 3881 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 3882 .call(); 3883 return future; 3884 } 3885 3886 @Override 3887 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 3888 return this.<Map<TableName, Long>> newMasterCaller() 3889 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 3890 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 3891 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 3892 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 3893 resp -> resp.getSizesList().stream().collect(Collectors 3894 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 3895 .call(); 3896 } 3897 3898 @Override 3899 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 3900 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 3901 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 3902 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 3903 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 3904 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 3905 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 3906 resp -> resp.getSnapshotsList().stream() 3907 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 3908 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 3909 .serverName(serverName).call(); 3910 } 3911 3912 private CompletableFuture<SpaceQuotaSnapshot> 3913 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 3914 return this.<SpaceQuotaSnapshot> newMasterCaller() 3915 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 3916 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 3917 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 3918 .call(); 3919 } 3920 3921 @Override 3922 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 3923 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 3924 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 3925 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3926 } 3927 3928 @Override 3929 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 3930 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 3931 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 3932 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 3933 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3934 } 3935 3936 @Override 3937 public CompletableFuture<Void> grant(UserPermission userPermission, 3938 boolean mergeExistingPermissions) { 3939 return this.<Void> newMasterCaller() 3940 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 3941 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 3942 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 3943 .call(); 3944 } 3945 3946 @Override 3947 public CompletableFuture<Void> revoke(UserPermission userPermission) { 3948 return this.<Void> newMasterCaller() 3949 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 3950 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 3951 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 3952 .call(); 3953 } 3954 3955 @Override 3956 public CompletableFuture<List<UserPermission>> 3957 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 3958 return this.<List<UserPermission>> newMasterCaller() 3959 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 3960 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 3961 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 3962 (s, c, req, done) -> s.getUserPermissions(c, req, done), 3963 resp -> resp.getUserPermissionList().stream() 3964 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 3965 .collect(Collectors.toList()))) 3966 .call(); 3967 } 3968 3969 @Override 3970 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 3971 List<Permission> permissions) { 3972 return this.<List<Boolean>> newMasterCaller() 3973 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 3974 List<Boolean>> call(controller, stub, 3975 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 3976 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 3977 resp -> resp.getHasUserPermissionList())) 3978 .call(); 3979 } 3980 3981 @Override 3982 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 3983 return this.<Boolean> newMasterCaller() 3984 .action((controller, stub) -> this.call(controller, stub, 3985 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 3986 MasterService.Interface::switchSnapshotCleanup, 3987 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 3988 .call(); 3989 } 3990 3991 @Override 3992 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 3993 return this.<Boolean> newMasterCaller() 3994 .action((controller, stub) -> this.call(controller, stub, 3995 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 3996 MasterService.Interface::isSnapshotCleanupEnabled, 3997 IsSnapshotCleanupEnabledResponse::getEnabled)) 3998 .call(); 3999 } 4000 4001 @Override 4002 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4003 return this.<Void> newMasterCaller() 4004 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4005 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4006 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4007 .call(); 4008 } 4009 4010 @Override 4011 public CompletableFuture<Void> addRSGroup(String groupName) { 4012 return this.<Void> newMasterCaller() 4013 .action( 4014 ((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, 4015 stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4016 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))) 4017 .call(); 4018 } 4019 4020 @Override 4021 public CompletableFuture<Void> removeRSGroup(String groupName) { 4022 return this.<Void> newMasterCaller() 4023 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4024 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4025 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4026 .call(); 4027 } 4028 4029 @Override 4030 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4031 BalanceRequest request) { 4032 return this.<BalanceResponse> newMasterCaller() 4033 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4034 BalanceResponse> call(controller, stub, 4035 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4036 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4037 .call(); 4038 } 4039 4040 @Override 4041 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4042 return this.<List<RSGroupInfo>> newMasterCaller() 4043 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4044 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4045 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4046 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4047 .call(); 4048 } 4049 4050 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4051 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4052 final String logType) { 4053 if (CollectionUtils.isEmpty(serverNames)) { 4054 return CompletableFuture.completedFuture(Collections.emptyList()); 4055 } 4056 return CompletableFuture.supplyAsync(() -> serverNames 4057 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4058 filterParams, limit, logType)) 4059 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4060 } 4061 4062 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4063 Map<String, Object> filterParams, int limit, String logType) { 4064 return this.<List<LogEntry>> newAdminCaller() 4065 .action((controller, stub) -> this.adminCall(controller, stub, 4066 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4067 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4068 .serverName(serverName).call(); 4069 } 4070 4071 @Override 4072 public CompletableFuture<List<Boolean>> 4073 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4074 if (CollectionUtils.isEmpty(serverNames)) { 4075 return CompletableFuture.completedFuture(Collections.emptyList()); 4076 } 4077 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4078 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4079 return convertToFutureOfList(clearSlowLogResponseList); 4080 } 4081 4082 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4083 return this.<Boolean> newAdminCaller() 4084 .action(((controller, stub) -> this.adminCall(controller, stub, 4085 RequestConverter.buildClearSlowLogResponseRequest(), 4086 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))) 4087 .serverName(serverName).call(); 4088 } 4089 4090 private static <T> CompletableFuture<List<T>> 4091 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4092 CompletableFuture<Void> allDoneFuture = 4093 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4094 return allDoneFuture 4095 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4096 } 4097 4098 @Override 4099 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4100 return this.<List<TableName>> newMasterCaller() 4101 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4102 List<TableName>> call(controller, stub, 4103 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4104 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4105 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4106 .call(); 4107 } 4108 4109 @Override 4110 public CompletableFuture<Pair<List<String>, List<TableName>>> 4111 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4112 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4113 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4114 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4115 Pair<List<String>, List<TableName>>> call(controller, stub, 4116 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4117 .build(), 4118 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4119 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4120 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4121 .call(); 4122 } 4123 4124 @Override 4125 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4126 return this.<RSGroupInfo> newMasterCaller() 4127 .action( 4128 ((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, 4129 RSGroupInfo> call(controller, stub, GetRSGroupInfoOfServerRequest.newBuilder() 4130 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4131 .setPort(hostPort.getPort()).build()) 4132 .build(), (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4133 resp -> resp.hasRSGroupInfo() 4134 ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) 4135 : null))) 4136 .call(); 4137 } 4138 4139 @Override 4140 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4141 return this.<Void> newMasterCaller() 4142 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4143 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4144 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4145 .call(); 4146 } 4147 4148 @Override 4149 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4150 CompletableFuture<Void> future = new CompletableFuture<>(); 4151 for (TableName tableName : tables) { 4152 addListener(tableExists(tableName), (exist, err) -> { 4153 if (err != null) { 4154 future.completeExceptionally(err); 4155 return; 4156 } 4157 if (!exist) { 4158 future.completeExceptionally(new TableNotFoundException(tableName)); 4159 return; 4160 } 4161 }); 4162 } 4163 addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> { 4164 if (err != null) { 4165 future.completeExceptionally(err); 4166 return; 4167 } 4168 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4169 future.complete(null); 4170 return; 4171 } 4172 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4173 for (TableDescriptor td : tableDescriptions) { 4174 newTableDescriptors 4175 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4176 } 4177 addListener( 4178 CompletableFuture.allOf( 4179 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4180 (v, e) -> { 4181 if (e != null) { 4182 future.completeExceptionally(e); 4183 } else { 4184 future.complete(v); 4185 } 4186 }); 4187 })); 4188 return future; 4189 } 4190 4191 @Override 4192 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4193 return this.<RSGroupInfo> newMasterCaller() 4194 .action(((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4195 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4196 GetRSGroupInfoOfTableRequest.newBuilder() 4197 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4198 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4199 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))) 4200 .call(); 4201 } 4202 4203 @Override 4204 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4205 return this.<RSGroupInfo> newMasterCaller() 4206 .action(((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4207 RSGroupInfo> call(controller, stub, 4208 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4209 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4210 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))) 4211 .call(); 4212 } 4213 4214 @Override 4215 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4216 return this.<Void> newMasterCaller() 4217 .action(((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4218 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4219 .setNewRsgroupName(newName).build(), 4220 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))) 4221 .call(); 4222 } 4223 4224 @Override 4225 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4226 Map<String, String> configuration) { 4227 UpdateRSGroupConfigRequest.Builder request = 4228 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4229 if (configuration != null) { 4230 configuration.entrySet().forEach(e -> request.addConfiguration( 4231 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4232 } 4233 return this.<Void> newMasterCaller() 4234 .action(((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4235 Void> call(controller, stub, request.build(), 4236 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))) 4237 .call(); 4238 } 4239 4240 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4241 return this.<List<LogEntry>> newMasterCaller() 4242 .action((controller, stub) -> this.call(controller, stub, 4243 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4244 ProtobufUtil::toBalancerDecisionResponse)) 4245 .call(); 4246 } 4247 4248 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4249 return this.<List<LogEntry>> newMasterCaller() 4250 .action((controller, stub) -> this.call(controller, stub, 4251 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4252 ProtobufUtil::toBalancerRejectionResponse)) 4253 .call(); 4254 } 4255 4256 @Override 4257 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4258 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4259 if (logType == null || serverType == null) { 4260 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4261 } 4262 switch (logType) { 4263 case "SLOW_LOG": 4264 case "LARGE_LOG": 4265 if (ServerType.MASTER.equals(serverType)) { 4266 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4267 } 4268 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4269 case "BALANCER_DECISION": 4270 if (ServerType.REGION_SERVER.equals(serverType)) { 4271 throw new IllegalArgumentException( 4272 "Balancer Decision logs are not maintained by HRegionServer"); 4273 } 4274 return getBalancerDecisions(limit); 4275 case "BALANCER_REJECTION": 4276 if (ServerType.REGION_SERVER.equals(serverType)) { 4277 throw new IllegalArgumentException( 4278 "Balancer Rejection logs are not maintained by HRegionServer"); 4279 } 4280 return getBalancerRejections(limit); 4281 default: 4282 return CompletableFuture.completedFuture(Collections.emptyList()); 4283 } 4284 } 4285 4286 @Override 4287 public CompletableFuture<Void> flushMasterStore() { 4288 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4289 return this.<Void> newMasterCaller() 4290 .action(((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4291 Void> call(controller, stub, request.build(), 4292 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))) 4293 .call(); 4294 } 4295}