001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.net.URI; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 052import org.apache.hadoop.hbase.CatalogFamilyFormat; 053import org.apache.hadoop.hbase.ClientMetaTableAccessor; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.NamespaceDescriptor; 061import org.apache.hadoop.hbase.RegionLocations; 062import org.apache.hadoop.hbase.RegionMetrics; 063import org.apache.hadoop.hbase.RegionMetricsBuilder; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableExistsException; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotDisabledException; 068import org.apache.hadoop.hbase.TableNotEnabledException; 069import org.apache.hadoop.hbase.TableNotFoundException; 070import org.apache.hadoop.hbase.UnknownRegionException; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.Scan.ReadType; 075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 076import org.apache.hadoop.hbase.client.replication.TableCFs; 077import org.apache.hadoop.hbase.client.security.SecurityCapability; 078import org.apache.hadoop.hbase.exceptions.DeserializationException; 079import org.apache.hadoop.hbase.ipc.HBaseRpcController; 080import org.apache.hadoop.hbase.net.Address; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 086import org.apache.hadoop.hbase.replication.ReplicationException; 087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 089import org.apache.hadoop.hbase.replication.SyncReplicationState; 090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 092import org.apache.hadoop.hbase.security.access.Permission; 093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 094import org.apache.hadoop.hbase.security.access.UserPermission; 095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.Strings; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 109import org.apache.hbase.thirdparty.com.google.protobuf.Message; 110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 113import org.apache.hbase.thirdparty.io.netty.util.Timeout; 114import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 116 117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 361import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 362import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 363import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 364 365/** 366 * The implementation of AsyncAdmin. 367 * <p> 368 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 369 * be finished inside the rpc framework thread, which means that the callbacks registered to the 370 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 371 * this class should not try to do time consuming tasks in the callbacks. 372 * @since 2.0.0 373 * @see AsyncHBaseAdmin 374 * @see AsyncConnection#getAdmin() 375 * @see AsyncConnection#getAdminBuilder() 376 */ 377@InterfaceAudience.Private 378class RawAsyncHBaseAdmin implements AsyncAdmin { 379 380 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 381 382 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 383 384 private final AsyncConnectionImpl connection; 385 386 private final HashedWheelTimer retryTimer; 387 388 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 389 390 private final long rpcTimeoutNs; 391 392 private final long operationTimeoutNs; 393 394 private final long pauseNs; 395 396 private final long pauseNsForServerOverloaded; 397 398 private final int maxAttempts; 399 400 private final int startLogErrorsCnt; 401 402 private final NonceGenerator ng; 403 404 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 405 AsyncAdminBuilderBase builder) { 406 this.connection = connection; 407 this.retryTimer = retryTimer; 408 this.metaTable = connection.getTable(META_TABLE_NAME); 409 this.rpcTimeoutNs = builder.rpcTimeoutNs; 410 this.operationTimeoutNs = builder.operationTimeoutNs; 411 this.pauseNs = builder.pauseNs; 412 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 413 LOG.warn( 414 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 415 + " the normal pause value {} ms, use the greater one instead", 416 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 417 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 418 this.pauseNsForServerOverloaded = builder.pauseNs; 419 } else { 420 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 421 } 422 this.maxAttempts = builder.maxAttempts; 423 this.startLogErrorsCnt = builder.startLogErrorsCnt; 424 this.ng = connection.getNonceGenerator(); 425 } 426 427 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 428 return this.connection.callerFactory.<T> masterRequest() 429 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 430 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 431 .pause(pauseNs, TimeUnit.NANOSECONDS) 432 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 433 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 434 } 435 436 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 437 return this.connection.callerFactory.<T> adminRequest() 438 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 439 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 440 .pause(pauseNs, TimeUnit.NANOSECONDS) 441 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 442 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 443 } 444 445 @FunctionalInterface 446 private interface MasterRpcCall<RESP, REQ> { 447 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 448 RpcCallback<RESP> done); 449 } 450 451 @FunctionalInterface 452 private interface AdminRpcCall<RESP, REQ> { 453 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 454 RpcCallback<RESP> done); 455 } 456 457 @FunctionalInterface 458 private interface Converter<D, S> { 459 D convert(S src) throws IOException; 460 } 461 462 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 463 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 464 Converter<RESP, PRESP> respConverter) { 465 CompletableFuture<RESP> future = new CompletableFuture<>(); 466 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 467 468 @Override 469 public void run(PRESP resp) { 470 if (controller.failed()) { 471 future.completeExceptionally(controller.getFailed()); 472 } else { 473 try { 474 future.complete(respConverter.convert(resp)); 475 } catch (IOException e) { 476 future.completeExceptionally(e); 477 } 478 } 479 } 480 }); 481 return future; 482 } 483 484 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 485 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 486 Converter<RESP, PRESP> respConverter) { 487 CompletableFuture<RESP> future = new CompletableFuture<>(); 488 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 489 490 @Override 491 public void run(PRESP resp) { 492 if (controller.failed()) { 493 future.completeExceptionally(controller.getFailed()); 494 } else { 495 try { 496 future.complete(respConverter.convert(resp)); 497 } catch (IOException e) { 498 future.completeExceptionally(e); 499 } 500 } 501 } 502 }); 503 return future; 504 } 505 506 /** 507 * short-circuit call for 508 * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)} 509 * by ignoring procedure result 510 */ 511 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 512 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 513 ProcedureBiConsumer<Void> consumer) { 514 return procedureCall(preq, rpcCall, respConverter, result -> null, consumer); 515 } 516 517 /** 518 * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, 519 * ProcedureBiConsumer) by skip setting priority for request 520 */ 521 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq, 522 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 523 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 524 return procedureCall(b -> { 525 }, preq, rpcCall, respConverter, resultConverter, consumer); 526 } 527 528 /** 529 * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter, 530 * ProcedureBiConsumer) by ignoring procedure result 531 */ 532 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 533 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 534 ProcedureBiConsumer<Void> consumer) { 535 return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer); 536 } 537 538 /** 539 * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, 540 * ProcedureBiConsumer) by skip setting priority for request 541 */ 542 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq, 543 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 544 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 545 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter, 546 consumer); 547 } 548 549 /** 550 * @param <PREQ> type of request 551 * @param <PRESP> type of response 552 * @param <PRES> type of procedure call result 553 * @param prioritySetter prioritySetter set priority by table for request 554 * @param preq procedure call request 555 * @param rpcCall procedure rpc call 556 * @param respConverter extract proc id from procedure call response 557 * @param resultConverter extract result from procedure call result 558 * @param consumer action performs on result 559 * @return procedure call result, null if procedure is void 560 */ 561 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall( 562 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 563 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 564 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 565 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller() 566 .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter)); 567 prioritySetter.accept(builder); 568 CompletableFuture<Long> procFuture = builder.call(); 569 CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter); 570 addListener(future, consumer); 571 return future; 572 } 573 574 @Override 575 public CompletableFuture<Boolean> tableExists(TableName tableName) { 576 if (TableName.isMetaTableName(tableName)) { 577 return CompletableFuture.completedFuture(true); 578 } 579 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 580 } 581 582 @Override 583 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 584 return getTableDescriptors( 585 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 586 } 587 588 /** 589 * {@link #listTableDescriptors(boolean)} 590 */ 591 @Override 592 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 593 boolean includeSysTables) { 594 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 595 + "use listTableDescriptors(boolean) instead"); 596 return getTableDescriptors( 597 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 598 } 599 600 @Override 601 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 602 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 603 + "use listTableDescriptors(boolean) instead"); 604 if (tableNames.isEmpty()) { 605 return CompletableFuture.completedFuture(Collections.emptyList()); 606 } 607 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 608 } 609 610 private CompletableFuture<List<TableDescriptor>> 611 getTableDescriptors(GetTableDescriptorsRequest request) { 612 return this.<List<TableDescriptor>> newMasterCaller() 613 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 614 List<TableDescriptor>> call(controller, stub, request, 615 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 616 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 617 .call(); 618 } 619 620 @Override 621 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 622 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 623 } 624 625 @Override 626 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 627 boolean includeSysTables) { 628 Preconditions.checkNotNull(pattern, 629 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 630 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 631 } 632 633 @Override 634 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 635 return this.<List<TableName>> newMasterCaller() 636 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 637 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 638 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 639 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 640 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 641 .call(); 642 } 643 644 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 645 return this.<List<TableName>> newMasterCaller() 646 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 647 List<TableName>> call(controller, stub, request, 648 (s, c, req, done) -> s.getTableNames(c, req, done), 649 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 650 .call(); 651 } 652 653 @Override 654 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 655 return this.<List<TableDescriptor>> newMasterCaller() 656 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 657 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 658 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 659 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 660 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 661 .call(); 662 } 663 664 @Override 665 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 666 return this.<List<TableDescriptor>> newMasterCaller() 667 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 668 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 669 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 670 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 671 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 672 .call(); 673 } 674 675 @Override 676 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 677 return this.<List<TableName>> newMasterCaller() 678 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 679 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 680 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 681 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 682 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 683 .call(); 684 } 685 686 @Override 687 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 688 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 689 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 690 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 691 List<TableSchema>> call(controller, stub, 692 RequestConverter.buildGetTableDescriptorsRequest(tableName), 693 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 694 (resp) -> resp.getTableSchemaList())) 695 .call(), (tableSchemas, error) -> { 696 if (error != null) { 697 future.completeExceptionally(error); 698 return; 699 } 700 if (!tableSchemas.isEmpty()) { 701 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 702 } else { 703 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 704 } 705 }); 706 return future; 707 } 708 709 @Override 710 public CompletableFuture<Void> createTable(TableDescriptor desc) { 711 return createTable(desc.getTableName(), 712 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 713 } 714 715 @Override 716 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 717 int numRegions) { 718 try { 719 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 720 } catch (IllegalArgumentException e) { 721 return failedFuture(e); 722 } 723 } 724 725 @Override 726 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 727 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 728 + " use createTable(TableDescriptor) instead"); 729 try { 730 verifySplitKeys(splitKeys); 731 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 732 splitKeys, ng.getNonceGroup(), ng.newNonce())); 733 } catch (IllegalArgumentException e) { 734 return failedFuture(e); 735 } 736 } 737 738 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 739 Preconditions.checkNotNull(tableName, "table name is null"); 740 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 741 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 742 new CreateTableProcedureBiConsumer(tableName)); 743 } 744 745 @Override 746 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 747 return modifyTable(desc, true); 748 } 749 750 @Override 751 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 752 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 753 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 754 ng.newNonce(), reopenRegions), 755 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 756 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 757 } 758 759 @Override 760 public CompletableFuture<Void> reopenTableRegions(TableName tableName) { 761 return reopenTableRegions(tableName, Collections.emptyList()); 762 } 763 764 @Override 765 public CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions) { 766 List<byte[]> regionNames = regions.stream().map(RegionInfo::getRegionName).toList(); 767 return this.<ReopenTableRegionsRequest, ReopenTableRegionsResponse> procedureCall(tableName, 768 RequestConverter.buildReopenTableRegionsRequest(tableName, regionNames, ng.getNonceGroup(), 769 ng.newNonce()), 770 (s, c, req, done) -> s.reopenTableRegions(c, req, done), (resp) -> resp.getProcId(), 771 new ReopenTableRegionsProcedureBiConsumer(this, tableName)); 772 } 773 774 @Override 775 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 776 return this.<ModifyTableStoreFileTrackerRequest, 777 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 778 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 779 ng.getNonceGroup(), ng.newNonce()), 780 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 781 (resp) -> resp.getProcId(), 782 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 783 } 784 785 @Override 786 public CompletableFuture<Void> deleteTable(TableName tableName) { 787 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 788 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 789 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 790 new DeleteTableProcedureBiConsumer(tableName)); 791 } 792 793 @Override 794 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 795 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 796 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 797 ng.newNonce()), 798 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 799 new TruncateTableProcedureBiConsumer(tableName)); 800 } 801 802 @Override 803 public CompletableFuture<Void> enableTable(TableName tableName) { 804 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 805 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 806 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 807 new EnableTableProcedureBiConsumer(tableName)); 808 } 809 810 @Override 811 public CompletableFuture<Void> disableTable(TableName tableName) { 812 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 813 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 814 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 815 new DisableTableProcedureBiConsumer(tableName)); 816 } 817 818 /** 819 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 820 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 821 * targetState). 822 */ 823 private static CompletableFuture<Boolean> completeCheckTableState( 824 CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error, 825 TableState.State targetState, TableName tableName) { 826 if (error != null) { 827 future.completeExceptionally(error); 828 } else { 829 if (tableState.isPresent()) { 830 future.complete(tableState.get().inStates(targetState)); 831 } else { 832 future.completeExceptionally(new TableNotFoundException(tableName)); 833 } 834 } 835 return future; 836 } 837 838 @Override 839 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 840 if (TableName.isMetaTableName(tableName)) { 841 return CompletableFuture.completedFuture(true); 842 } 843 CompletableFuture<Boolean> future = new CompletableFuture<>(); 844 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 845 (tableState, error) -> { 846 completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName); 847 }); 848 return future; 849 } 850 851 @Override 852 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 853 if (TableName.isMetaTableName(tableName)) { 854 return CompletableFuture.completedFuture(false); 855 } 856 CompletableFuture<Boolean> future = new CompletableFuture<>(); 857 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 858 (tableState, error) -> { 859 completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName); 860 }); 861 return future; 862 } 863 864 @Override 865 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 866 if (TableName.isMetaTableName(tableName)) { 867 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 868 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 869 } 870 CompletableFuture<Boolean> future = new CompletableFuture<>(); 871 addListener(isTableEnabled(tableName), (enabled, error) -> { 872 if (error != null) { 873 if (error instanceof TableNotFoundException) { 874 future.complete(false); 875 } else { 876 future.completeExceptionally(error); 877 } 878 return; 879 } 880 if (!enabled) { 881 future.complete(false); 882 } else { 883 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 884 (locations, error1) -> { 885 if (error1 != null) { 886 future.completeExceptionally(error1); 887 return; 888 } 889 List<HRegionLocation> notDeployedRegions = locations.stream() 890 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 891 if (notDeployedRegions.size() > 0) { 892 if (LOG.isDebugEnabled()) { 893 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 894 } 895 future.complete(false); 896 return; 897 } 898 future.complete(true); 899 }); 900 } 901 }); 902 return future; 903 } 904 905 @Override 906 public CompletableFuture<Void> addColumnFamily(TableName tableName, 907 ColumnFamilyDescriptor columnFamily) { 908 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 909 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 910 ng.newNonce()), 911 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 912 new AddColumnFamilyProcedureBiConsumer(tableName)); 913 } 914 915 @Override 916 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 917 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 918 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 919 ng.newNonce()), 920 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 921 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 922 } 923 924 @Override 925 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 926 ColumnFamilyDescriptor columnFamily) { 927 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 928 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 929 ng.newNonce()), 930 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 931 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 932 } 933 934 @Override 935 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 936 byte[] family, String dstSFT) { 937 return this.<ModifyColumnStoreFileTrackerRequest, 938 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 939 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 940 ng.getNonceGroup(), ng.newNonce()), 941 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 942 (resp) -> resp.getProcId(), 943 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 944 } 945 946 @Override 947 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 948 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 949 RequestConverter.buildCreateNamespaceRequest(descriptor), 950 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 951 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 952 } 953 954 @Override 955 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 956 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 957 RequestConverter.buildModifyNamespaceRequest(descriptor), 958 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 959 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 960 } 961 962 @Override 963 public CompletableFuture<Void> deleteNamespace(String name) { 964 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 965 RequestConverter.buildDeleteNamespaceRequest(name), 966 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 967 new DeleteNamespaceProcedureBiConsumer(name)); 968 } 969 970 @Override 971 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 972 return this.<NamespaceDescriptor> newMasterCaller() 973 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 974 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 975 RequestConverter.buildGetNamespaceDescriptorRequest(name), 976 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 977 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 978 .call(); 979 } 980 981 @Override 982 public CompletableFuture<List<String>> listNamespaces() { 983 return this.<List<String>> newMasterCaller() 984 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 985 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 986 (s, c, req, done) -> s.listNamespaces(c, req, done), 987 (resp) -> resp.getNamespaceNameList())) 988 .call(); 989 } 990 991 @Override 992 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 993 return this.<List<NamespaceDescriptor>> newMasterCaller() 994 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 995 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 996 ListNamespaceDescriptorsRequest.newBuilder().build(), 997 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 998 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 999 .call(); 1000 } 1001 1002 @Override 1003 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 1004 return this.<List<RegionInfo>> newAdminCaller() 1005 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 1006 List<RegionInfo>> adminCall(controller, stub, 1007 RequestConverter.buildGetOnlineRegionRequest(), 1008 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 1009 resp -> ProtobufUtil.getRegionInfos(resp))) 1010 .serverName(serverName).call(); 1011 } 1012 1013 @Override 1014 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 1015 if (tableName.equals(META_TABLE_NAME)) { 1016 return connection.registry.getMetaRegionLocations() 1017 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 1018 .collect(Collectors.toList())); 1019 } else { 1020 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 1021 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 1022 } 1023 } 1024 1025 @Override 1026 public CompletableFuture<Void> flush(TableName tableName) { 1027 return flush(tableName, Collections.emptyList()); 1028 } 1029 1030 @Override 1031 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 1032 Preconditions.checkNotNull(columnFamily, 1033 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 1034 return flush(tableName, Collections.singletonList(columnFamily)); 1035 } 1036 1037 @Override 1038 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 1039 // This is for keeping compatibility with old implementation. 1040 // If the server version is lower than the client version, it's possible that the 1041 // flushTable method is not present in the server side, if so, we need to fall back 1042 // to the old implementation. 1043 Preconditions.checkNotNull(columnFamilyList, 1044 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 1045 List<byte[]> columnFamilies = columnFamilyList.stream() 1046 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 1047 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 1048 ng.getNonceGroup(), ng.newNonce()); 1049 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 1050 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 1051 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 1052 CompletableFuture<Void> future = new CompletableFuture<>(); 1053 addListener(procFuture, (ret, error) -> { 1054 if (error != null) { 1055 if ( 1056 error instanceof TableNotFoundException || error instanceof TableNotEnabledException 1057 || error instanceof NoSuchColumnFamilyException 1058 ) { 1059 future.completeExceptionally(error); 1060 } else if (error instanceof DoNotRetryIOException) { 1061 // usually this is caused by the method is not present on the server or 1062 // the hbase hadoop version does not match the running hadoop version. 1063 // if that happens, we need fall back to the old flush implementation. 1064 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 1065 legacyFlush(future, tableName, columnFamilies); 1066 } else { 1067 future.completeExceptionally(error); 1068 } 1069 } else { 1070 future.complete(ret); 1071 } 1072 }); 1073 return future; 1074 } 1075 1076 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1077 List<byte[]> columnFamilies) { 1078 addListener(tableExists(tableName), (exists, err) -> { 1079 if (err != null) { 1080 future.completeExceptionally(err); 1081 } else if (!exists) { 1082 future.completeExceptionally(new TableNotFoundException(tableName)); 1083 } else { 1084 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1085 if (err2 != null) { 1086 future.completeExceptionally(err2); 1087 } else if (!tableEnabled) { 1088 future.completeExceptionally(new TableNotEnabledException(tableName)); 1089 } else { 1090 Map<String, String> props = new HashMap<>(); 1091 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1092 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1093 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1094 } 1095 addListener( 1096 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1097 (ret, err3) -> { 1098 if (err3 != null) { 1099 future.completeExceptionally(err3); 1100 } else { 1101 future.complete(ret); 1102 } 1103 }); 1104 } 1105 }); 1106 } 1107 }); 1108 } 1109 1110 @Override 1111 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1112 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 1113 }); 1114 } 1115 1116 @Override 1117 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1118 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1119 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1120 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1121 }); 1122 } 1123 1124 /** 1125 * This method is for internal use only, where we need the response of the flush. 1126 * <p/> 1127 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1128 * API. 1129 */ 1130 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1131 boolean writeFlushWALMarker) { 1132 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1133 addListener(getRegionLocation(regionName), (location, err) -> { 1134 if (err != null) { 1135 future.completeExceptionally(err); 1136 return; 1137 } 1138 ServerName serverName = location.getServerName(); 1139 if (serverName == null) { 1140 future 1141 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1142 return; 1143 } 1144 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1145 (ret, err2) -> { 1146 if (err2 != null) { 1147 future.completeExceptionally(err2); 1148 } else { 1149 future.complete(ret); 1150 } 1151 }); 1152 }); 1153 return future; 1154 } 1155 1156 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1157 byte[] columnFamily, boolean writeFlushWALMarker) { 1158 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1159 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1160 FlushRegionResponse> adminCall(controller, stub, 1161 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1162 writeFlushWALMarker), 1163 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1164 .call(); 1165 } 1166 1167 @Override 1168 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1169 CompletableFuture<Void> future = new CompletableFuture<>(); 1170 addListener(getRegions(sn), (hRegionInfos, err) -> { 1171 if (err != null) { 1172 future.completeExceptionally(err); 1173 return; 1174 } 1175 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1176 if (hRegionInfos != null) { 1177 hRegionInfos 1178 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1179 }))); 1180 } 1181 addListener(CompletableFuture.allOf( 1182 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1183 if (err2 != null) { 1184 future.completeExceptionally(err2); 1185 } else { 1186 future.complete(ret); 1187 } 1188 }); 1189 }); 1190 return future; 1191 } 1192 1193 @Override 1194 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1195 return compact(tableName, null, false, compactType); 1196 } 1197 1198 @Override 1199 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1200 CompactType compactType) { 1201 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1202 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1203 return compact(tableName, columnFamily, false, compactType); 1204 } 1205 1206 @Override 1207 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1208 return compactRegion(regionName, null, false); 1209 } 1210 1211 @Override 1212 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1213 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1214 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1215 return compactRegion(regionName, columnFamily, false); 1216 } 1217 1218 @Override 1219 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1220 return compact(tableName, null, true, compactType); 1221 } 1222 1223 @Override 1224 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1225 CompactType compactType) { 1226 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1227 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1228 return compact(tableName, columnFamily, true, compactType); 1229 } 1230 1231 @Override 1232 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1233 return compactRegion(regionName, null, true); 1234 } 1235 1236 @Override 1237 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1238 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1239 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1240 return compactRegion(regionName, columnFamily, true); 1241 } 1242 1243 @Override 1244 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1245 return compactRegionServer(sn, false); 1246 } 1247 1248 @Override 1249 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1250 return compactRegionServer(sn, true); 1251 } 1252 1253 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1254 CompletableFuture<Void> future = new CompletableFuture<>(); 1255 addListener(getRegions(sn), (hRegionInfos, err) -> { 1256 if (err != null) { 1257 future.completeExceptionally(err); 1258 return; 1259 } 1260 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1261 if (hRegionInfos != null) { 1262 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1263 } 1264 addListener(CompletableFuture.allOf( 1265 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1266 if (err2 != null) { 1267 future.completeExceptionally(err2); 1268 } else { 1269 future.complete(ret); 1270 } 1271 }); 1272 }); 1273 return future; 1274 } 1275 1276 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1277 boolean major) { 1278 CompletableFuture<Void> future = new CompletableFuture<>(); 1279 addListener(getRegionLocation(regionName), (location, err) -> { 1280 if (err != null) { 1281 future.completeExceptionally(err); 1282 return; 1283 } 1284 ServerName serverName = location.getServerName(); 1285 if (serverName == null) { 1286 future 1287 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1288 return; 1289 } 1290 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1291 (ret, err2) -> { 1292 if (err2 != null) { 1293 future.completeExceptionally(err2); 1294 } else { 1295 future.complete(ret); 1296 } 1297 }); 1298 }); 1299 return future; 1300 } 1301 1302 /** 1303 * List all region locations for the specific table. 1304 */ 1305 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1306 if (TableName.META_TABLE_NAME.equals(tableName)) { 1307 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1308 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1309 if (err != null) { 1310 future.completeExceptionally(err); 1311 } else if ( 1312 metaRegions == null || metaRegions.isEmpty() 1313 || metaRegions.getDefaultRegionLocation() == null 1314 ) { 1315 future.completeExceptionally(new IOException("meta region does not found")); 1316 } else { 1317 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1318 } 1319 }); 1320 return future; 1321 } else { 1322 // For non-meta table, we fetch all locations by scanning hbase:meta table 1323 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1324 } 1325 } 1326 1327 /** 1328 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1329 */ 1330 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1331 CompactType compactType) { 1332 CompletableFuture<Void> future = new CompletableFuture<>(); 1333 1334 switch (compactType) { 1335 case MOB: 1336 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1337 if (err != null) { 1338 future.completeExceptionally(err); 1339 return; 1340 } 1341 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1342 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1343 if (err2 != null) { 1344 future.completeExceptionally(err2); 1345 } else { 1346 future.complete(ret); 1347 } 1348 }); 1349 }); 1350 break; 1351 case NORMAL: 1352 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1353 if (err != null) { 1354 future.completeExceptionally(err); 1355 return; 1356 } 1357 if (locations == null || locations.isEmpty()) { 1358 future.completeExceptionally(new TableNotFoundException(tableName)); 1359 return; 1360 } 1361 CompletableFuture<?>[] compactFutures = 1362 locations.stream().filter(l -> l.getRegion() != null) 1363 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1364 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1365 .toArray(CompletableFuture<?>[]::new); 1366 // future complete unless all of the compact futures are completed. 1367 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1368 if (err2 != null) { 1369 future.completeExceptionally(err2); 1370 } else { 1371 future.complete(ret); 1372 } 1373 }); 1374 }); 1375 break; 1376 default: 1377 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1378 } 1379 return future; 1380 } 1381 1382 /** 1383 * Compact the region at specific region server. 1384 */ 1385 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1386 final boolean major, byte[] columnFamily) { 1387 return this.<Void> newAdminCaller().serverName(sn) 1388 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1389 Void> adminCall(controller, stub, 1390 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1391 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1392 .call(); 1393 } 1394 1395 private byte[] toEncodeRegionName(byte[] regionName) { 1396 return RegionInfo.isEncodedRegionName(regionName) 1397 ? regionName 1398 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1399 } 1400 1401 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1402 CompletableFuture<TableName> result) { 1403 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1404 if (err != null) { 1405 result.completeExceptionally(err); 1406 return; 1407 } 1408 RegionInfo regionInfo = location.getRegion(); 1409 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1410 result.completeExceptionally( 1411 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1412 return; 1413 } 1414 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1415 if (!tableName.get().equals(regionInfo.getTable())) { 1416 // tables of this two region should be same. 1417 result.completeExceptionally( 1418 new IllegalArgumentException("Cannot merge regions from two different tables " 1419 + tableName.get() + " and " + regionInfo.getTable())); 1420 } else { 1421 result.complete(tableName.get()); 1422 } 1423 } 1424 }); 1425 } 1426 1427 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1428 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1429 CompletableFuture<TableName> future = new CompletableFuture<>(); 1430 for (byte[] encodedRegionName : encodedRegionNames) { 1431 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1432 } 1433 return future; 1434 } 1435 1436 @Override 1437 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1438 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1439 } 1440 1441 @Override 1442 public CompletableFuture<Boolean> isMergeEnabled() { 1443 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1444 } 1445 1446 @Override 1447 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1448 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1449 } 1450 1451 @Override 1452 public CompletableFuture<Boolean> isSplitEnabled() { 1453 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1454 } 1455 1456 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1457 MasterSwitchType switchType) { 1458 SetSplitOrMergeEnabledRequest request = 1459 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1460 return this.<Boolean> newMasterCaller() 1461 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1462 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1463 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1464 (resp) -> resp.getPrevValueList().get(0))) 1465 .call(); 1466 } 1467 1468 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1469 IsSplitOrMergeEnabledRequest request = 1470 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1471 return this.<Boolean> newMasterCaller() 1472 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1473 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1474 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1475 .call(); 1476 } 1477 1478 @Override 1479 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1480 if (nameOfRegionsToMerge.size() < 2) { 1481 return failedFuture(new IllegalArgumentException( 1482 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1483 } 1484 CompletableFuture<Void> future = new CompletableFuture<>(); 1485 byte[][] encodedNameOfRegionsToMerge = 1486 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1487 1488 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1489 if (err != null) { 1490 future.completeExceptionally(err); 1491 return; 1492 } 1493 1494 final MergeTableRegionsRequest request; 1495 try { 1496 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1497 forcible, ng.getNonceGroup(), ng.newNonce()); 1498 } catch (DeserializationException e) { 1499 future.completeExceptionally(e); 1500 return; 1501 } 1502 1503 addListener( 1504 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1505 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1506 (ret, err2) -> { 1507 if (err2 != null) { 1508 future.completeExceptionally(err2); 1509 } else { 1510 future.complete(ret); 1511 } 1512 }); 1513 }); 1514 return future; 1515 } 1516 1517 @Override 1518 public CompletableFuture<Void> split(TableName tableName) { 1519 CompletableFuture<Void> future = new CompletableFuture<>(); 1520 addListener(tableExists(tableName), (exist, error) -> { 1521 if (error != null) { 1522 future.completeExceptionally(error); 1523 return; 1524 } 1525 if (!exist) { 1526 future.completeExceptionally(new TableNotFoundException(tableName)); 1527 return; 1528 } 1529 addListener(metaTable 1530 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1531 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1532 ClientMetaTableAccessor.QueryType.REGION)) 1533 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1534 ClientMetaTableAccessor.QueryType.REGION))), 1535 (results, err2) -> { 1536 if (err2 != null) { 1537 future.completeExceptionally(err2); 1538 return; 1539 } 1540 if (results != null && !results.isEmpty()) { 1541 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1542 for (Result r : results) { 1543 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1544 continue; 1545 } 1546 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1547 if (rl != null) { 1548 for (HRegionLocation h : rl.getRegionLocations()) { 1549 if (h != null && h.getServerName() != null) { 1550 RegionInfo hri = h.getRegion(); 1551 if ( 1552 hri == null || hri.isSplitParent() 1553 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1554 ) { 1555 continue; 1556 } 1557 splitFutures.add(split(hri, null)); 1558 } 1559 } 1560 } 1561 } 1562 addListener( 1563 CompletableFuture 1564 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1565 (ret, exception) -> { 1566 if (exception != null) { 1567 future.completeExceptionally(exception); 1568 return; 1569 } 1570 future.complete(ret); 1571 }); 1572 } else { 1573 future.complete(null); 1574 } 1575 }); 1576 }); 1577 return future; 1578 } 1579 1580 @Override 1581 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1582 CompletableFuture<Void> result = new CompletableFuture<>(); 1583 if (splitPoint == null) { 1584 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1585 } 1586 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1587 (loc, err) -> { 1588 if (err != null) { 1589 result.completeExceptionally(err); 1590 } else if (loc == null || loc.getRegion() == null) { 1591 result.completeExceptionally(new IllegalArgumentException( 1592 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1593 } else { 1594 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1595 if (err2 != null) { 1596 result.completeExceptionally(err2); 1597 } else { 1598 result.complete(ret); 1599 } 1600 1601 }); 1602 } 1603 }); 1604 return result; 1605 } 1606 1607 @Override 1608 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1609 CompletableFuture<Void> future = new CompletableFuture<>(); 1610 addListener(getRegionLocation(regionName), (location, err) -> { 1611 if (err != null) { 1612 future.completeExceptionally(err); 1613 return; 1614 } 1615 RegionInfo regionInfo = location.getRegion(); 1616 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1617 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1618 + "Replicas are auto-split when their primary is split.")); 1619 return; 1620 } 1621 ServerName serverName = location.getServerName(); 1622 if (serverName == null) { 1623 future 1624 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1625 return; 1626 } 1627 addListener(split(regionInfo, null), (ret, err2) -> { 1628 if (err2 != null) { 1629 future.completeExceptionally(err2); 1630 } else { 1631 future.complete(ret); 1632 } 1633 }); 1634 }); 1635 return future; 1636 } 1637 1638 @Override 1639 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1640 Preconditions.checkNotNull(splitPoint, 1641 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1642 CompletableFuture<Void> future = new CompletableFuture<>(); 1643 addListener(getRegionLocation(regionName), (location, err) -> { 1644 if (err != null) { 1645 future.completeExceptionally(err); 1646 return; 1647 } 1648 RegionInfo regionInfo = location.getRegion(); 1649 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1650 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1651 + "Replicas are auto-split when their primary is split.")); 1652 return; 1653 } 1654 ServerName serverName = location.getServerName(); 1655 if (serverName == null) { 1656 future 1657 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1658 return; 1659 } 1660 if ( 1661 regionInfo.getStartKey() != null 1662 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1663 ) { 1664 future.completeExceptionally( 1665 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1666 return; 1667 } 1668 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1669 if (err2 != null) { 1670 future.completeExceptionally(err2); 1671 } else { 1672 future.complete(ret); 1673 } 1674 }); 1675 }); 1676 return future; 1677 } 1678 1679 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1680 CompletableFuture<Void> future = new CompletableFuture<>(); 1681 TableName tableName = hri.getTable(); 1682 final SplitTableRegionRequest request; 1683 try { 1684 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1685 ng.newNonce()); 1686 } catch (DeserializationException e) { 1687 future.completeExceptionally(e); 1688 return future; 1689 } 1690 1691 addListener( 1692 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1693 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1694 (ret, err2) -> { 1695 if (err2 != null) { 1696 future.completeExceptionally(err2); 1697 } else { 1698 future.complete(ret); 1699 } 1700 }); 1701 return future; 1702 } 1703 1704 @Override 1705 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1706 CompletableFuture<Void> future = new CompletableFuture<>(); 1707 addListener(getRegionLocation(regionName), (location, err) -> { 1708 if (err != null) { 1709 future.completeExceptionally(err); 1710 return; 1711 } 1712 RegionInfo regionInfo = location.getRegion(); 1713 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1714 future.completeExceptionally(new IllegalArgumentException( 1715 "Can't truncate replicas directly.Replicas are auto-truncated " 1716 + "when their primary is truncated.")); 1717 return; 1718 } 1719 ServerName serverName = location.getServerName(); 1720 if (serverName == null) { 1721 future 1722 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1723 return; 1724 } 1725 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1726 if (err2 != null) { 1727 future.completeExceptionally(err2); 1728 } else { 1729 future.complete(ret); 1730 } 1731 }); 1732 }); 1733 return future; 1734 } 1735 1736 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1737 CompletableFuture<Void> future = new CompletableFuture<>(); 1738 TableName tableName = hri.getTable(); 1739 final MasterProtos.TruncateRegionRequest request; 1740 try { 1741 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1742 } catch (DeserializationException e) { 1743 future.completeExceptionally(e); 1744 return future; 1745 } 1746 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1747 MasterProtos.TruncateRegionResponse::getProcId, 1748 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1749 if (err2 != null) { 1750 future.completeExceptionally(err2); 1751 } else { 1752 future.complete(ret); 1753 } 1754 }); 1755 return future; 1756 } 1757 1758 @Override 1759 public CompletableFuture<Void> assign(byte[] regionName) { 1760 CompletableFuture<Void> future = new CompletableFuture<>(); 1761 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1762 if (err != null) { 1763 future.completeExceptionally(err); 1764 return; 1765 } 1766 addListener( 1767 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1768 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1769 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1770 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1771 .call(), 1772 (ret, err2) -> { 1773 if (err2 != null) { 1774 future.completeExceptionally(err2); 1775 } else { 1776 future.complete(ret); 1777 } 1778 }); 1779 }); 1780 return future; 1781 } 1782 1783 @Override 1784 public CompletableFuture<Void> unassign(byte[] regionName) { 1785 CompletableFuture<Void> future = new CompletableFuture<>(); 1786 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1787 if (err != null) { 1788 future.completeExceptionally(err); 1789 return; 1790 } 1791 addListener( 1792 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1793 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1794 Void> call(controller, stub, 1795 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1796 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1797 .call(), 1798 (ret, err2) -> { 1799 if (err2 != null) { 1800 future.completeExceptionally(err2); 1801 } else { 1802 future.complete(ret); 1803 } 1804 }); 1805 }); 1806 return future; 1807 } 1808 1809 @Override 1810 public CompletableFuture<Void> offline(byte[] regionName) { 1811 CompletableFuture<Void> future = new CompletableFuture<>(); 1812 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1813 if (err != null) { 1814 future.completeExceptionally(err); 1815 return; 1816 } 1817 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1818 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1819 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1820 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1821 .call(), (ret, err2) -> { 1822 if (err2 != null) { 1823 future.completeExceptionally(err2); 1824 } else { 1825 future.complete(ret); 1826 } 1827 }); 1828 }); 1829 return future; 1830 } 1831 1832 @Override 1833 public CompletableFuture<Void> move(byte[] regionName) { 1834 CompletableFuture<Void> future = new CompletableFuture<>(); 1835 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1836 if (err != null) { 1837 future.completeExceptionally(err); 1838 return; 1839 } 1840 addListener( 1841 moveRegion(regionInfo, 1842 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1843 (ret, err2) -> { 1844 if (err2 != null) { 1845 future.completeExceptionally(err2); 1846 } else { 1847 future.complete(ret); 1848 } 1849 }); 1850 }); 1851 return future; 1852 } 1853 1854 @Override 1855 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1856 Preconditions.checkNotNull(destServerName, 1857 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1858 CompletableFuture<Void> future = new CompletableFuture<>(); 1859 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1860 if (err != null) { 1861 future.completeExceptionally(err); 1862 return; 1863 } 1864 addListener( 1865 moveRegion(regionInfo, RequestConverter 1866 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1867 (ret, err2) -> { 1868 if (err2 != null) { 1869 future.completeExceptionally(err2); 1870 } else { 1871 future.complete(ret); 1872 } 1873 }); 1874 }); 1875 return future; 1876 } 1877 1878 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1879 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1880 .action( 1881 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1882 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1883 .call(); 1884 } 1885 1886 @Override 1887 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1888 return this.<Void> newMasterCaller() 1889 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1890 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1891 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1892 .call(); 1893 } 1894 1895 @Override 1896 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1897 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1898 Scan scan = QuotaTableUtil.makeScan(filter); 1899 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1900 new AdvancedScanResultConsumer() { 1901 List<QuotaSettings> settings = new ArrayList<>(); 1902 1903 @Override 1904 public void onNext(Result[] results, ScanController controller) { 1905 for (Result result : results) { 1906 try { 1907 QuotaTableUtil.parseResultToCollection(result, settings); 1908 } catch (IOException e) { 1909 controller.terminate(); 1910 future.completeExceptionally(e); 1911 } 1912 } 1913 } 1914 1915 @Override 1916 public void onError(Throwable error) { 1917 future.completeExceptionally(error); 1918 } 1919 1920 @Override 1921 public void onComplete() { 1922 future.complete(settings); 1923 } 1924 }); 1925 return future; 1926 } 1927 1928 @Override 1929 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1930 boolean enabled) { 1931 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1932 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1933 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1934 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1935 } 1936 1937 @Override 1938 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1939 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1940 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1941 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1942 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1943 } 1944 1945 @Override 1946 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1947 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1948 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1949 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1950 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1951 } 1952 1953 @Override 1954 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1955 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1956 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1957 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1958 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1959 } 1960 1961 @Override 1962 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1963 return this.<ReplicationPeerConfig> newMasterCaller() 1964 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1965 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1966 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1967 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1968 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1969 .call(); 1970 } 1971 1972 @Override 1973 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1974 ReplicationPeerConfig peerConfig) { 1975 return this.<UpdateReplicationPeerConfigRequest, 1976 UpdateReplicationPeerConfigResponse> procedureCall( 1977 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1978 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1979 (resp) -> resp.getProcId(), 1980 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1981 } 1982 1983 @Override 1984 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1985 SyncReplicationState clusterState) { 1986 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1987 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1988 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1989 clusterState), 1990 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1991 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1992 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1993 } 1994 1995 @Override 1996 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1997 Map<TableName, List<String>> tableCfs) { 1998 if (tableCfs == null) { 1999 return failedFuture(new ReplicationException("tableCfs is null")); 2000 } 2001 2002 CompletableFuture<Void> future = new CompletableFuture<>(); 2003 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 2004 if (!completeExceptionally(future, error)) { 2005 ReplicationPeerConfig newPeerConfig = 2006 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 2007 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 2008 if (!completeExceptionally(future, error)) { 2009 future.complete(result); 2010 } 2011 }); 2012 } 2013 }); 2014 return future; 2015 } 2016 2017 @Override 2018 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 2019 Map<TableName, List<String>> tableCfs) { 2020 if (tableCfs == null) { 2021 return failedFuture(new ReplicationException("tableCfs is null")); 2022 } 2023 2024 CompletableFuture<Void> future = new CompletableFuture<>(); 2025 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 2026 if (!completeExceptionally(future, error)) { 2027 ReplicationPeerConfig newPeerConfig = null; 2028 try { 2029 newPeerConfig = ReplicationPeerConfigUtil 2030 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 2031 } catch (ReplicationException e) { 2032 future.completeExceptionally(e); 2033 return; 2034 } 2035 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 2036 if (!completeExceptionally(future, error)) { 2037 future.complete(result); 2038 } 2039 }); 2040 } 2041 }); 2042 return future; 2043 } 2044 2045 @Override 2046 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 2047 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 2048 } 2049 2050 @Override 2051 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 2052 Preconditions.checkNotNull(pattern, 2053 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 2054 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 2055 } 2056 2057 private CompletableFuture<List<ReplicationPeerDescription>> 2058 listReplicationPeers(ListReplicationPeersRequest request) { 2059 return this.<List<ReplicationPeerDescription>> newMasterCaller() 2060 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 2061 List<ReplicationPeerDescription>> call(controller, stub, request, 2062 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 2063 (resp) -> resp.getPeerDescList().stream() 2064 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 2065 .collect(Collectors.toList()))) 2066 .call(); 2067 } 2068 2069 @Override 2070 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 2071 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 2072 addListener(listTableDescriptors(), (tables, error) -> { 2073 if (!completeExceptionally(future, error)) { 2074 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 2075 tables.forEach(table -> { 2076 Map<String, Integer> cfs = new HashMap<>(); 2077 Stream.of(table.getColumnFamilies()) 2078 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 2079 .forEach(column -> { 2080 cfs.put(column.getNameAsString(), column.getScope()); 2081 }); 2082 if (!cfs.isEmpty()) { 2083 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 2084 } 2085 }); 2086 future.complete(replicatedTableCFs); 2087 } 2088 }); 2089 return future; 2090 } 2091 2092 @Override 2093 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 2094 SnapshotProtos.SnapshotDescription snapshot = 2095 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2096 try { 2097 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2098 } catch (IllegalArgumentException e) { 2099 return failedFuture(e); 2100 } 2101 CompletableFuture<Void> future = new CompletableFuture<>(); 2102 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2103 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2104 addListener(this.<SnapshotResponse> newMasterCaller() 2105 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2106 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2107 .call(), (resp, err) -> { 2108 if (err != null) { 2109 future.completeExceptionally(err); 2110 return; 2111 } 2112 waitSnapshotFinish(snapshotDesc, future, resp); 2113 }); 2114 return future; 2115 } 2116 2117 // This is for keeping compatibility with old implementation. 2118 // If there is a procId field in the response, then the snapshot will be operated with a 2119 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2120 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2121 SnapshotResponse resp) { 2122 if (resp.hasProcId()) { 2123 getProcedureResult(resp.getProcId(), src -> null, future, 0); 2124 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2125 } else { 2126 long expectedTimeout = resp.getExpectedTimeout(); 2127 TimerTask pollingTask = new TimerTask() { 2128 int tries = 0; 2129 long startTime = EnvironmentEdgeManager.currentTime(); 2130 long endTime = startTime + expectedTimeout; 2131 long maxPauseTime = expectedTimeout / maxAttempts; 2132 2133 @Override 2134 public void run(Timeout timeout) throws Exception { 2135 if (EnvironmentEdgeManager.currentTime() < endTime) { 2136 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2137 if (err2 != null) { 2138 future.completeExceptionally(err2); 2139 } else if (done) { 2140 future.complete(null); 2141 } else { 2142 // retry again after pauseTime. 2143 long pauseTime = 2144 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2145 pauseTime = Math.min(pauseTime, maxPauseTime); 2146 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2147 } 2148 }); 2149 } else { 2150 future 2151 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2152 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2153 } 2154 } 2155 }; 2156 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2157 } 2158 } 2159 2160 @Override 2161 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2162 return this.<Boolean> newMasterCaller() 2163 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2164 Boolean> call(controller, stub, 2165 IsSnapshotDoneRequest.newBuilder() 2166 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2167 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2168 .call(); 2169 } 2170 2171 @Override 2172 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2173 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2174 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2175 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2176 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2177 } 2178 2179 @Override 2180 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2181 boolean restoreAcl) { 2182 CompletableFuture<Void> future = new CompletableFuture<>(); 2183 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2184 if (err != null) { 2185 future.completeExceptionally(err); 2186 return; 2187 } 2188 TableName tableName = null; 2189 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2190 for (SnapshotDescription snap : snapshotDescriptions) { 2191 if (snap.getName().equals(snapshotName)) { 2192 tableName = snap.getTableName(); 2193 break; 2194 } 2195 } 2196 } 2197 if (tableName == null) { 2198 future.completeExceptionally( 2199 new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist.")); 2200 return; 2201 } 2202 final TableName finalTableName = tableName; 2203 addListener(tableExists(finalTableName), (exists, err2) -> { 2204 if (err2 != null) { 2205 future.completeExceptionally(err2); 2206 } else if (!exists) { 2207 // if table does not exist, then just clone snapshot into new table. 2208 completeConditionalOnFuture(future, 2209 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2210 } else { 2211 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2212 if (err4 != null) { 2213 future.completeExceptionally(err4); 2214 } else if (!disabled) { 2215 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2216 } else { 2217 completeConditionalOnFuture(future, 2218 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2219 } 2220 }); 2221 } 2222 }); 2223 }); 2224 return future; 2225 } 2226 2227 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2228 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2229 if (takeFailSafeSnapshot) { 2230 CompletableFuture<Void> future = new CompletableFuture<>(); 2231 // Step.1 Take a snapshot of the current state 2232 String failSafeSnapshotSnapshotNameFormat = 2233 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2234 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2235 final String failSafeSnapshotSnapshotName = 2236 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2237 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2238 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2239 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2240 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2241 if (err != null) { 2242 future.completeExceptionally(err); 2243 } else { 2244 // Step.2 Restore snapshot 2245 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2246 (void2, err2) -> { 2247 if (err2 != null) { 2248 // Step.3.a Something went wrong during the restore and try to rollback. 2249 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2250 restoreAcl, null), (void3, err3) -> { 2251 if (err3 != null) { 2252 future.completeExceptionally(err3); 2253 } else { 2254 // If fail to restore snapshot but rollback successfully, delete the 2255 // restore-failsafe snapshot. 2256 LOG.info( 2257 "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2258 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> { 2259 if (err4 != null) { 2260 LOG.error("Unable to remove the failsafe snapshot: {}", 2261 failSafeSnapshotSnapshotName, err4); 2262 } 2263 String msg = 2264 "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot=" 2265 + failSafeSnapshotSnapshotName + " succeeded."; 2266 LOG.error(msg); 2267 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2268 }); 2269 } 2270 }); 2271 } else { 2272 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2273 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2274 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2275 if (err3 != null) { 2276 LOG.error( 2277 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2278 err3); 2279 } 2280 future.complete(ret3); 2281 }); 2282 } 2283 }); 2284 } 2285 }); 2286 return future; 2287 } else { 2288 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2289 } 2290 } 2291 2292 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2293 CompletableFuture<T> parentFuture) { 2294 addListener(parentFuture, (res, err) -> { 2295 if (err != null) { 2296 dependentFuture.completeExceptionally(err); 2297 } else { 2298 dependentFuture.complete(res); 2299 } 2300 }); 2301 } 2302 2303 @Override 2304 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2305 boolean restoreAcl, String customSFT) { 2306 CompletableFuture<Void> future = new CompletableFuture<>(); 2307 addListener(tableExists(tableName), (exists, err) -> { 2308 if (err != null) { 2309 future.completeExceptionally(err); 2310 } else if (exists) { 2311 future.completeExceptionally(new TableExistsException(tableName)); 2312 } else { 2313 completeConditionalOnFuture(future, 2314 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2315 } 2316 }); 2317 return future; 2318 } 2319 2320 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2321 boolean restoreAcl, String customSFT) { 2322 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2323 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2324 try { 2325 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2326 } catch (IllegalArgumentException e) { 2327 return failedFuture(e); 2328 } 2329 RestoreSnapshotRequest.Builder builder = 2330 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2331 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2332 if (customSFT != null) { 2333 builder.setCustomSFT(customSFT); 2334 } 2335 return waitProcedureResult(this.<Long> newMasterCaller() 2336 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2337 Long> call(controller, stub, builder.build(), 2338 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2339 .call(), result -> null); 2340 } 2341 2342 @Override 2343 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2344 return getCompletedSnapshots(null); 2345 } 2346 2347 @Override 2348 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2349 Preconditions.checkNotNull(pattern, 2350 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2351 return getCompletedSnapshots(pattern); 2352 } 2353 2354 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2355 return this.<List<SnapshotDescription>> newMasterCaller() 2356 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2357 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2358 GetCompletedSnapshotsRequest.newBuilder().build(), 2359 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2360 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2361 .call(); 2362 } 2363 2364 @Override 2365 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2366 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2367 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2368 return getCompletedSnapshots(tableNamePattern, null); 2369 } 2370 2371 @Override 2372 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2373 Pattern snapshotNamePattern) { 2374 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2375 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2376 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2377 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2378 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2379 } 2380 2381 private CompletableFuture<List<SnapshotDescription>> 2382 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2383 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2384 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2385 if (err != null) { 2386 future.completeExceptionally(err); 2387 return; 2388 } 2389 if (tableNames == null || tableNames.size() <= 0) { 2390 future.complete(Collections.emptyList()); 2391 return; 2392 } 2393 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2394 if (err2 != null) { 2395 future.completeExceptionally(err2); 2396 return; 2397 } 2398 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2399 future.complete(Collections.emptyList()); 2400 return; 2401 } 2402 future.complete(snapshotDescList.stream() 2403 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2404 .collect(Collectors.toList())); 2405 }); 2406 }); 2407 return future; 2408 } 2409 2410 @Override 2411 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2412 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2413 } 2414 2415 @Override 2416 public CompletableFuture<Void> deleteSnapshots() { 2417 return internalDeleteSnapshots(null, null); 2418 } 2419 2420 @Override 2421 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2422 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2423 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2424 return internalDeleteSnapshots(null, snapshotNamePattern); 2425 } 2426 2427 @Override 2428 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2429 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2430 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2431 return internalDeleteSnapshots(tableNamePattern, null); 2432 } 2433 2434 @Override 2435 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2436 Pattern snapshotNamePattern) { 2437 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2438 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2439 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2440 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2441 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2442 } 2443 2444 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2445 Pattern snapshotNamePattern) { 2446 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2447 if (tableNamePattern == null) { 2448 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2449 } else { 2450 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2451 } 2452 CompletableFuture<Void> future = new CompletableFuture<>(); 2453 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2454 if (err != null) { 2455 future.completeExceptionally(err); 2456 return; 2457 } 2458 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2459 future.complete(null); 2460 return; 2461 } 2462 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2463 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2464 if (e != null) { 2465 future.completeExceptionally(e); 2466 } else { 2467 future.complete(v); 2468 } 2469 }); 2470 }); 2471 return future; 2472 } 2473 2474 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2475 return this.<Void> newMasterCaller() 2476 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2477 controller, stub, 2478 DeleteSnapshotRequest.newBuilder() 2479 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2480 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2481 .call(); 2482 } 2483 2484 @Override 2485 public CompletableFuture<Void> execProcedure(String signature, String instance, 2486 Map<String, String> props) { 2487 CompletableFuture<Void> future = new CompletableFuture<>(); 2488 ProcedureDescription procDesc = 2489 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2490 addListener(this.<Long> newMasterCaller() 2491 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2492 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2493 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2494 .call(), (expectedTimeout, err) -> { 2495 if (err != null) { 2496 future.completeExceptionally(err); 2497 return; 2498 } 2499 TimerTask pollingTask = new TimerTask() { 2500 int tries = 0; 2501 long startTime = EnvironmentEdgeManager.currentTime(); 2502 long endTime = startTime + expectedTimeout; 2503 long maxPauseTime = expectedTimeout / maxAttempts; 2504 2505 @Override 2506 public void run(Timeout timeout) throws Exception { 2507 if (EnvironmentEdgeManager.currentTime() < endTime) { 2508 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2509 if (err2 != null) { 2510 future.completeExceptionally(err2); 2511 return; 2512 } 2513 if (done) { 2514 future.complete(null); 2515 } else { 2516 // retry again after pauseTime. 2517 long pauseTime = 2518 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2519 pauseTime = Math.min(pauseTime, maxPauseTime); 2520 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2521 TimeUnit.MICROSECONDS); 2522 } 2523 }); 2524 } else { 2525 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2526 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2527 } 2528 } 2529 }; 2530 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2531 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2532 }); 2533 return future; 2534 } 2535 2536 @Override 2537 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2538 Map<String, String> props) { 2539 ProcedureDescription proDesc = 2540 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2541 return this.<byte[]> newMasterCaller() 2542 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2543 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2544 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2545 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2546 .call(); 2547 } 2548 2549 @Override 2550 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2551 Map<String, String> props) { 2552 ProcedureDescription proDesc = 2553 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2554 return this.<Boolean> newMasterCaller() 2555 .action( 2556 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2557 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2558 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2559 .call(); 2560 } 2561 2562 @Override 2563 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2564 return this.<Boolean> newMasterCaller().action( 2565 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2566 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2567 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2568 .call(); 2569 } 2570 2571 @Override 2572 public CompletableFuture<String> getProcedures() { 2573 return this.<String> newMasterCaller() 2574 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2575 controller, stub, GetProceduresRequest.newBuilder().build(), 2576 (s, c, req, done) -> s.getProcedures(c, req, done), 2577 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2578 .call(); 2579 } 2580 2581 @Override 2582 public CompletableFuture<String> getLocks() { 2583 return this.<String> newMasterCaller() 2584 .action( 2585 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2586 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2587 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2588 .call(); 2589 } 2590 2591 @Override 2592 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2593 boolean offload) { 2594 return this.<Void> newMasterCaller() 2595 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2596 DecommissionRegionServersResponse, Void> call(controller, stub, 2597 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2598 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2599 .call(); 2600 } 2601 2602 @Override 2603 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2604 return this.<List<ServerName>> newMasterCaller() 2605 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2606 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2607 ListDecommissionedRegionServersRequest.newBuilder().build(), 2608 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2609 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2610 .collect(Collectors.toList()))) 2611 .call(); 2612 } 2613 2614 @Override 2615 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2616 List<byte[]> encodedRegionNames) { 2617 return this.<Void> newMasterCaller() 2618 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2619 RecommissionRegionServerResponse, Void> call(controller, stub, 2620 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2621 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2622 .call(); 2623 } 2624 2625 /** 2626 * Get the region location for the passed region name. The region name may be a full region name 2627 * or encoded region name. If the region does not found, then it'll throw an 2628 * UnknownRegionException wrapped by a {@link CompletableFuture} 2629 * @param regionNameOrEncodedRegionName region name or encoded region name 2630 * @return region location, wrapped by a {@link CompletableFuture} 2631 */ 2632 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2633 if (regionNameOrEncodedRegionName == null) { 2634 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2635 } 2636 2637 CompletableFuture<Optional<HRegionLocation>> future; 2638 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2639 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2640 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2641 // old format encodedName, should be meta region 2642 future = connection.registry.getMetaRegionLocations() 2643 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2644 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2645 } else { 2646 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2647 regionNameOrEncodedRegionName); 2648 } 2649 } else { 2650 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2651 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2652 RegionInfo regionInfo; 2653 try { 2654 regionInfo = 2655 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2656 } catch (IOException ioe) { 2657 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2658 } 2659 2660 if (regionInfo.isMetaRegion()) { 2661 future = connection.registry.getMetaRegionLocations() 2662 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2663 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2664 .findFirst()); 2665 } else { 2666 future = 2667 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2668 } 2669 } 2670 2671 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2672 addListener(future, (location, err) -> { 2673 if (err != null) { 2674 returnedFuture.completeExceptionally(err); 2675 return; 2676 } 2677 if (!location.isPresent() || location.get().getRegion() == null) { 2678 returnedFuture.completeExceptionally( 2679 new UnknownRegionException("Invalid region name or encoded region name: " 2680 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2681 } else { 2682 returnedFuture.complete(location.get()); 2683 } 2684 }); 2685 return returnedFuture; 2686 } 2687 2688 /** 2689 * Get the region info for the passed region name. The region name may be a full region name or 2690 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2691 * wrapped by a {@link CompletableFuture} 2692 * @return region info, wrapped by a {@link CompletableFuture} 2693 */ 2694 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2695 if (regionNameOrEncodedRegionName == null) { 2696 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2697 } 2698 2699 if ( 2700 Bytes.equals(regionNameOrEncodedRegionName, 2701 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2702 || Bytes.equals(regionNameOrEncodedRegionName, 2703 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2704 ) { 2705 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2706 } 2707 2708 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2709 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2710 if (err != null) { 2711 future.completeExceptionally(err); 2712 } else { 2713 future.complete(location.getRegion()); 2714 } 2715 }); 2716 return future; 2717 } 2718 2719 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2720 if (numRegions < 3) { 2721 throw new IllegalArgumentException("Must create at least three regions"); 2722 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2723 throw new IllegalArgumentException("Start key must be smaller than end key"); 2724 } 2725 if (numRegions == 3) { 2726 return new byte[][] { startKey, endKey }; 2727 } 2728 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2729 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2730 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2731 } 2732 return splitKeys; 2733 } 2734 2735 private void verifySplitKeys(byte[][] splitKeys) { 2736 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2737 // Verify there are no duplicate split keys 2738 byte[] lastKey = null; 2739 for (byte[] splitKey : splitKeys) { 2740 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2741 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2742 } 2743 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2744 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2745 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2746 } 2747 lastKey = splitKey; 2748 } 2749 } 2750 2751 private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> { 2752 2753 abstract void onFinished(); 2754 2755 abstract void onError(Throwable error); 2756 2757 @Override 2758 public void accept(T value, Throwable error) { 2759 if (error != null) { 2760 onError(error); 2761 return; 2762 } 2763 onFinished(); 2764 } 2765 } 2766 2767 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> { 2768 protected final TableName tableName; 2769 2770 TableProcedureBiConsumer(TableName tableName) { 2771 this.tableName = tableName; 2772 } 2773 2774 abstract String getOperationType(); 2775 2776 String getDescription() { 2777 return "Operation: " + getOperationType() + ", " + "Table Name: " 2778 + tableName.getNameWithNamespaceInclAsString(); 2779 } 2780 2781 @Override 2782 void onFinished() { 2783 LOG.info(getDescription() + " completed"); 2784 } 2785 2786 @Override 2787 void onError(Throwable error) { 2788 LOG.info(getDescription() + " failed with " + error.getMessage()); 2789 } 2790 } 2791 2792 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> { 2793 protected final String namespaceName; 2794 2795 NamespaceProcedureBiConsumer(String namespaceName) { 2796 this.namespaceName = namespaceName; 2797 } 2798 2799 abstract String getOperationType(); 2800 2801 String getDescription() { 2802 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2803 } 2804 2805 @Override 2806 void onFinished() { 2807 LOG.info("{} completed", getDescription()); 2808 } 2809 2810 @Override 2811 void onError(Throwable error) { 2812 LOG.info("{} failed with {}", getDescription(), error.getMessage()); 2813 } 2814 } 2815 2816 private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer { 2817 2818 @Override 2819 void onFinished() { 2820 LOG.info("RestoreBackupSystemTableProcedure completed"); 2821 } 2822 2823 @Override 2824 void onError(Throwable error) { 2825 LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage()); 2826 } 2827 } 2828 2829 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2830 2831 CreateTableProcedureBiConsumer(TableName tableName) { 2832 super(tableName); 2833 } 2834 2835 @Override 2836 String getOperationType() { 2837 return "CREATE"; 2838 } 2839 } 2840 2841 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2842 2843 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2844 super(tableName); 2845 } 2846 2847 @Override 2848 String getOperationType() { 2849 return "MODIFY"; 2850 } 2851 } 2852 2853 private static class ReopenTableRegionsProcedureBiConsumer extends TableProcedureBiConsumer { 2854 2855 ReopenTableRegionsProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2856 super(tableName); 2857 } 2858 2859 @Override 2860 String getOperationType() { 2861 return "REOPEN_TABLE_REGIONS"; 2862 } 2863 } 2864 2865 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2866 extends TableProcedureBiConsumer { 2867 2868 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2869 super(tableName); 2870 } 2871 2872 @Override 2873 String getOperationType() { 2874 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2875 } 2876 } 2877 2878 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2879 2880 DeleteTableProcedureBiConsumer(TableName tableName) { 2881 super(tableName); 2882 } 2883 2884 @Override 2885 String getOperationType() { 2886 return "DELETE"; 2887 } 2888 2889 @Override 2890 void onFinished() { 2891 connection.getLocator().clearCache(this.tableName); 2892 super.onFinished(); 2893 } 2894 } 2895 2896 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2897 2898 TruncateTableProcedureBiConsumer(TableName tableName) { 2899 super(tableName); 2900 } 2901 2902 @Override 2903 String getOperationType() { 2904 return "TRUNCATE"; 2905 } 2906 } 2907 2908 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2909 2910 EnableTableProcedureBiConsumer(TableName tableName) { 2911 super(tableName); 2912 } 2913 2914 @Override 2915 String getOperationType() { 2916 return "ENABLE"; 2917 } 2918 } 2919 2920 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2921 2922 DisableTableProcedureBiConsumer(TableName tableName) { 2923 super(tableName); 2924 } 2925 2926 @Override 2927 String getOperationType() { 2928 return "DISABLE"; 2929 } 2930 } 2931 2932 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2933 2934 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2935 super(tableName); 2936 } 2937 2938 @Override 2939 String getOperationType() { 2940 return "ADD_COLUMN_FAMILY"; 2941 } 2942 } 2943 2944 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2945 2946 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2947 super(tableName); 2948 } 2949 2950 @Override 2951 String getOperationType() { 2952 return "DELETE_COLUMN_FAMILY"; 2953 } 2954 } 2955 2956 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2957 2958 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2959 super(tableName); 2960 } 2961 2962 @Override 2963 String getOperationType() { 2964 return "MODIFY_COLUMN_FAMILY"; 2965 } 2966 } 2967 2968 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2969 extends TableProcedureBiConsumer { 2970 2971 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2972 super(tableName); 2973 } 2974 2975 @Override 2976 String getOperationType() { 2977 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2978 } 2979 } 2980 2981 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2982 2983 FlushTableProcedureBiConsumer(TableName tableName) { 2984 super(tableName); 2985 } 2986 2987 @Override 2988 String getOperationType() { 2989 return "FLUSH"; 2990 } 2991 } 2992 2993 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2994 2995 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2996 super(namespaceName); 2997 } 2998 2999 @Override 3000 String getOperationType() { 3001 return "CREATE_NAMESPACE"; 3002 } 3003 } 3004 3005 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 3006 3007 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 3008 super(namespaceName); 3009 } 3010 3011 @Override 3012 String getOperationType() { 3013 return "DELETE_NAMESPACE"; 3014 } 3015 } 3016 3017 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 3018 3019 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 3020 super(namespaceName); 3021 } 3022 3023 @Override 3024 String getOperationType() { 3025 return "MODIFY_NAMESPACE"; 3026 } 3027 } 3028 3029 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3030 3031 MergeTableRegionProcedureBiConsumer(TableName tableName) { 3032 super(tableName); 3033 } 3034 3035 @Override 3036 String getOperationType() { 3037 return "MERGE_REGIONS"; 3038 } 3039 } 3040 3041 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3042 3043 SplitTableRegionProcedureBiConsumer(TableName tableName) { 3044 super(tableName); 3045 } 3046 3047 @Override 3048 String getOperationType() { 3049 return "SPLIT_REGION"; 3050 } 3051 } 3052 3053 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3054 3055 TruncateRegionProcedureBiConsumer(TableName tableName) { 3056 super(tableName); 3057 } 3058 3059 @Override 3060 String getOperationType() { 3061 return "TRUNCATE_REGION"; 3062 } 3063 } 3064 3065 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 3066 SnapshotProcedureBiConsumer(TableName tableName) { 3067 super(tableName); 3068 } 3069 3070 @Override 3071 String getOperationType() { 3072 return "SNAPSHOT"; 3073 } 3074 } 3075 3076 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> { 3077 private final String peerId; 3078 private final Supplier<String> getOperation; 3079 3080 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 3081 this.peerId = peerId; 3082 this.getOperation = getOperation; 3083 } 3084 3085 String getDescription() { 3086 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 3087 } 3088 3089 @Override 3090 void onFinished() { 3091 LOG.info("{} completed", getDescription()); 3092 } 3093 3094 @Override 3095 void onError(Throwable error) { 3096 LOG.info("{} failed with {}", getDescription(), error.getMessage()); 3097 } 3098 } 3099 3100 private static final class RollAllWALWritersBiConsumer 3101 extends ProcedureBiConsumer<Map<ServerName, Long>> { 3102 3103 @Override 3104 void onFinished() { 3105 LOG.info("Rolling all WAL writers completed"); 3106 } 3107 3108 @Override 3109 void onError(Throwable error) { 3110 LOG.warn("Rolling all WAL writers failed with {}", error.getMessage()); 3111 } 3112 } 3113 3114 private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture, 3115 Converter<T, ByteString> converter) { 3116 CompletableFuture<T> future = new CompletableFuture<>(); 3117 addListener(procFuture, (procId, error) -> { 3118 if (error != null) { 3119 future.completeExceptionally(error); 3120 return; 3121 } 3122 getProcedureResult(procId, converter, future, 0); 3123 }); 3124 return future; 3125 } 3126 3127 private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter, 3128 CompletableFuture<T> future, int retries) { 3129 addListener( 3130 this.<GetProcedureResultResponse> newMasterCaller() 3131 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 3132 GetProcedureResultResponse> call(controller, stub, 3133 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 3134 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 3135 .call(), 3136 (response, error) -> { 3137 if (error != null) { 3138 Throwable exc = ConnectionUtils.translateException(error); 3139 if (exc instanceof DoNotRetryIOException) { 3140 // stop retrying on DNRIOE 3141 future.completeExceptionally(exc); 3142 } else { 3143 LOG.warn("failed to get the procedure result procId={}", procId, exc); 3144 retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), 3145 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3146 } 3147 return; 3148 } 3149 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 3150 retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), 3151 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3152 return; 3153 } 3154 if (response.hasException()) { 3155 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 3156 future.completeExceptionally(ioe); 3157 } else { 3158 try { 3159 future.complete(converter.convert(response.getResult())); 3160 } catch (IOException e) { 3161 future.completeExceptionally(e); 3162 } 3163 } 3164 }); 3165 } 3166 3167 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3168 CompletableFuture<T> future = new CompletableFuture<>(); 3169 future.completeExceptionally(error); 3170 return future; 3171 } 3172 3173 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3174 if (error != null) { 3175 future.completeExceptionally(error); 3176 return true; 3177 } 3178 return false; 3179 } 3180 3181 @Override 3182 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3183 return getClusterMetrics(EnumSet.allOf(Option.class)); 3184 } 3185 3186 @Override 3187 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3188 return this.<ClusterMetrics> newMasterCaller() 3189 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3190 ClusterMetrics> call(controller, stub, 3191 RequestConverter.buildGetClusterStatusRequest(options), 3192 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3193 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3194 .call(); 3195 } 3196 3197 @Override 3198 public CompletableFuture<Void> shutdown() { 3199 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3200 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3201 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3202 resp -> null)) 3203 .call(); 3204 } 3205 3206 @Override 3207 public CompletableFuture<Void> stopMaster() { 3208 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3209 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3210 controller, stub, StopMasterRequest.newBuilder().build(), 3211 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3212 .call(); 3213 } 3214 3215 @Override 3216 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3217 StopServerRequest request = RequestConverter 3218 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3219 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3220 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3221 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3222 resp -> null)) 3223 .serverName(serverName).call(); 3224 } 3225 3226 @Override 3227 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3228 return this.<Void> newAdminCaller() 3229 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3230 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3231 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3232 .serverName(serverName).call(); 3233 } 3234 3235 @Override 3236 public CompletableFuture<Void> updateConfiguration() { 3237 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3238 addListener( 3239 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3240 (status, err) -> { 3241 if (err != null) { 3242 future.completeExceptionally(err); 3243 } else { 3244 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3245 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3246 futures.add(updateConfiguration(status.getMasterName())); 3247 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3248 addListener( 3249 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3250 (result, err2) -> { 3251 if (err2 != null) { 3252 future.completeExceptionally(err2); 3253 } else { 3254 future.complete(result); 3255 } 3256 }); 3257 } 3258 }); 3259 return future; 3260 } 3261 3262 @Override 3263 public CompletableFuture<Void> updateConfiguration(String groupName) { 3264 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3265 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3266 if (err != null) { 3267 future.completeExceptionally(err); 3268 } else if (rsGroupInfo == null) { 3269 future.completeExceptionally( 3270 new IllegalArgumentException("Group does not exist: " + groupName)); 3271 } else { 3272 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3273 if (err2 != null) { 3274 future.completeExceptionally(err2); 3275 } else { 3276 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3277 List<ServerName> groupServers = status.getServersName().stream() 3278 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3279 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3280 addListener( 3281 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3282 (result, err3) -> { 3283 if (err3 != null) { 3284 future.completeExceptionally(err3); 3285 } else { 3286 future.complete(result); 3287 } 3288 }); 3289 } 3290 }); 3291 } 3292 }); 3293 return future; 3294 } 3295 3296 @Override 3297 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3298 return this.<Void> newAdminCaller() 3299 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3300 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3301 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3302 .serverName(serverName).call(); 3303 } 3304 3305 @Override 3306 public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() { 3307 return this 3308 .<RollAllWALWritersRequest, RollAllWALWritersResponse, 3309 Map<ServerName, 3310 Long>> procedureCall( 3311 RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()), 3312 (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(), 3313 result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap() 3314 .entrySet().stream().collect(Collectors 3315 .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)), 3316 new RollAllWALWritersBiConsumer()); 3317 } 3318 3319 @Override 3320 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3321 return this.<Void> newAdminCaller() 3322 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3323 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3324 RequestConverter.buildClearCompactionQueuesRequest(queues), 3325 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3326 .serverName(serverName).call(); 3327 } 3328 3329 @Override 3330 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3331 return this.<List<SecurityCapability>> newMasterCaller() 3332 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3333 List<SecurityCapability>> call(controller, stub, 3334 SecurityCapabilitiesRequest.newBuilder().build(), 3335 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3336 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3337 .call(); 3338 } 3339 3340 @Override 3341 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3342 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3343 } 3344 3345 @Override 3346 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3347 TableName tableName) { 3348 Preconditions.checkNotNull(tableName, 3349 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3350 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3351 } 3352 3353 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3354 ServerName serverName) { 3355 return this.<List<RegionMetrics>> newAdminCaller() 3356 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3357 List<RegionMetrics>> adminCall(controller, stub, request, 3358 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3359 RegionMetricsBuilder::toRegionMetrics)) 3360 .serverName(serverName).call(); 3361 } 3362 3363 @Override 3364 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3365 return this.<Boolean> newMasterCaller() 3366 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3367 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3368 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3369 resp -> resp.getInMaintenanceMode())) 3370 .call(); 3371 } 3372 3373 @Override 3374 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3375 CompactType compactType) { 3376 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3377 3378 switch (compactType) { 3379 case MOB: 3380 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3381 if (err != null) { 3382 future.completeExceptionally(err); 3383 return; 3384 } 3385 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3386 3387 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3388 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3389 GetRegionInfoResponse> adminCall(controller, stub, 3390 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3391 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3392 .call(), (resp2, err2) -> { 3393 if (err2 != null) { 3394 future.completeExceptionally(err2); 3395 } else { 3396 if (resp2.hasCompactionState()) { 3397 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3398 } else { 3399 future.complete(CompactionState.NONE); 3400 } 3401 } 3402 }); 3403 }); 3404 break; 3405 case NORMAL: 3406 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3407 if (err != null) { 3408 future.completeExceptionally(err); 3409 return; 3410 } 3411 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3412 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3413 locations.stream().filter(loc -> loc.getServerName() != null) 3414 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3415 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3416 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3417 // If any region compaction state is MAJOR_AND_MINOR 3418 // the table compaction state is MAJOR_AND_MINOR, too. 3419 if (err2 != null) { 3420 future.completeExceptionally(unwrapCompletionException(err2)); 3421 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3422 future.complete(regionState); 3423 } else { 3424 regionStates.add(regionState); 3425 } 3426 })); 3427 }); 3428 addListener( 3429 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3430 (ret, err3) -> { 3431 // If future not completed, check all regions's compaction state 3432 if (!future.isCompletedExceptionally() && !future.isDone()) { 3433 CompactionState state = CompactionState.NONE; 3434 for (CompactionState regionState : regionStates) { 3435 switch (regionState) { 3436 case MAJOR: 3437 if (state == CompactionState.MINOR) { 3438 future.complete(CompactionState.MAJOR_AND_MINOR); 3439 } else { 3440 state = CompactionState.MAJOR; 3441 } 3442 break; 3443 case MINOR: 3444 if (state == CompactionState.MAJOR) { 3445 future.complete(CompactionState.MAJOR_AND_MINOR); 3446 } else { 3447 state = CompactionState.MINOR; 3448 } 3449 break; 3450 case NONE: 3451 default: 3452 } 3453 } 3454 if (!future.isDone()) { 3455 future.complete(state); 3456 } 3457 } 3458 }); 3459 }); 3460 break; 3461 default: 3462 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3463 } 3464 3465 return future; 3466 } 3467 3468 @Override 3469 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3470 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3471 addListener(getRegionLocation(regionName), (location, err) -> { 3472 if (err != null) { 3473 future.completeExceptionally(err); 3474 return; 3475 } 3476 ServerName serverName = location.getServerName(); 3477 if (serverName == null) { 3478 future 3479 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3480 return; 3481 } 3482 addListener( 3483 this.<GetRegionInfoResponse> newAdminCaller() 3484 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3485 GetRegionInfoResponse> adminCall(controller, stub, 3486 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3487 true), 3488 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3489 .serverName(serverName).call(), 3490 (resp2, err2) -> { 3491 if (err2 != null) { 3492 future.completeExceptionally(err2); 3493 } else { 3494 if (resp2.hasCompactionState()) { 3495 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3496 } else { 3497 future.complete(CompactionState.NONE); 3498 } 3499 } 3500 }); 3501 }); 3502 return future; 3503 } 3504 3505 @Override 3506 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3507 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3508 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3509 return this.<Optional<Long>> newMasterCaller() 3510 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3511 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3512 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3513 ProtobufUtil::toOptionalTimestamp)) 3514 .call(); 3515 } 3516 3517 @Override 3518 public CompletableFuture<Optional<Long>> 3519 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3520 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3521 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3522 addListener(getRegionInfo(regionName), (region, err) -> { 3523 if (err != null) { 3524 future.completeExceptionally(err); 3525 return; 3526 } 3527 MajorCompactionTimestampForRegionRequest.Builder builder = 3528 MajorCompactionTimestampForRegionRequest.newBuilder(); 3529 builder.setRegion( 3530 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3531 addListener(this.<Optional<Long>> newMasterCaller() 3532 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3533 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3534 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3535 ProtobufUtil::toOptionalTimestamp)) 3536 .call(), (timestamp, err2) -> { 3537 if (err2 != null) { 3538 future.completeExceptionally(err2); 3539 } else { 3540 future.complete(timestamp); 3541 } 3542 }); 3543 }); 3544 return future; 3545 } 3546 3547 @Override 3548 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3549 List<String> serverNamesList) { 3550 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3551 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3552 if (err != null) { 3553 future.completeExceptionally(err); 3554 return; 3555 } 3556 // Accessed by multiple threads. 3557 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3558 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3559 serverNames.stream().forEach(serverName -> { 3560 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3561 if (err2 != null) { 3562 future.completeExceptionally(unwrapCompletionException(err2)); 3563 } else { 3564 serverStates.put(serverName, serverState); 3565 } 3566 })); 3567 }); 3568 addListener( 3569 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3570 (ret, err3) -> { 3571 if (!future.isCompletedExceptionally()) { 3572 if (err3 != null) { 3573 future.completeExceptionally(err3); 3574 } else { 3575 future.complete(serverStates); 3576 } 3577 } 3578 }); 3579 }); 3580 return future; 3581 } 3582 3583 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3584 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3585 if (serverNamesList.isEmpty()) { 3586 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3587 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3588 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3589 if (err != null) { 3590 future.completeExceptionally(err); 3591 } else { 3592 future.complete(clusterMetrics.getServersName()); 3593 } 3594 }); 3595 return future; 3596 } else { 3597 List<ServerName> serverList = new ArrayList<>(); 3598 for (String regionServerName : serverNamesList) { 3599 ServerName serverName = null; 3600 try { 3601 serverName = ServerName.valueOf(regionServerName); 3602 } catch (Exception e) { 3603 future.completeExceptionally( 3604 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3605 } 3606 if (serverName == null) { 3607 future.completeExceptionally( 3608 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3609 } else { 3610 serverList.add(serverName); 3611 } 3612 } 3613 future.complete(serverList); 3614 } 3615 return future; 3616 } 3617 3618 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3619 return this.<Boolean> newAdminCaller().serverName(serverName) 3620 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3621 Boolean> adminCall(controller, stub, 3622 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3623 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3624 .call(); 3625 } 3626 3627 @Override 3628 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3629 return this.<Boolean> newMasterCaller() 3630 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3631 Boolean> call(controller, stub, 3632 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3633 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3634 (resp) -> resp.getPrevBalanceValue())) 3635 .call(); 3636 } 3637 3638 @Override 3639 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3640 return this.<BalanceResponse> newMasterCaller() 3641 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3642 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3643 (s, c, req, done) -> s.balance(c, req, done), 3644 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3645 .call(); 3646 } 3647 3648 @Override 3649 public CompletableFuture<Boolean> isBalancerEnabled() { 3650 return this.<Boolean> newMasterCaller() 3651 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3652 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3653 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3654 .call(); 3655 } 3656 3657 @Override 3658 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3659 return this.<Boolean> newMasterCaller() 3660 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3661 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3662 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3663 (resp) -> resp.getPrevNormalizerValue())) 3664 .call(); 3665 } 3666 3667 @Override 3668 public CompletableFuture<Boolean> isNormalizerEnabled() { 3669 return this.<Boolean> newMasterCaller() 3670 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3671 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3672 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3673 .call(); 3674 } 3675 3676 @Override 3677 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3678 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3679 } 3680 3681 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3682 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3683 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3684 } 3685 3686 @Override 3687 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3688 return this.<Boolean> newMasterCaller() 3689 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3690 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3691 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3692 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3693 (resp) -> resp.getPrevValue())) 3694 .call(); 3695 } 3696 3697 @Override 3698 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3699 return this.<Boolean> newMasterCaller() 3700 .action( 3701 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3702 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3703 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3704 .call(); 3705 } 3706 3707 @Override 3708 public CompletableFuture<Boolean> runCleanerChore() { 3709 return this.<Boolean> newMasterCaller() 3710 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3711 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3712 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3713 (resp) -> resp.getCleanerChoreRan())) 3714 .call(); 3715 } 3716 3717 @Override 3718 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3719 return this.<Boolean> newMasterCaller() 3720 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3721 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3722 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3723 .call(); 3724 } 3725 3726 @Override 3727 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3728 return this.<Boolean> newMasterCaller() 3729 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3730 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3731 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3732 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3733 .call(); 3734 } 3735 3736 @Override 3737 public CompletableFuture<Integer> runCatalogJanitor() { 3738 return this.<Integer> newMasterCaller() 3739 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3740 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3741 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3742 .call(); 3743 } 3744 3745 @Override 3746 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3747 ServiceCaller<S, R> callable) { 3748 MasterCoprocessorRpcChannelImpl channel = 3749 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3750 S stub = stubMaker.apply(channel); 3751 CompletableFuture<R> future = new CompletableFuture<>(); 3752 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3753 callable.call(stub, controller, resp -> { 3754 if (controller.failed()) { 3755 future.completeExceptionally(controller.getFailed()); 3756 } else { 3757 future.complete(resp); 3758 } 3759 }); 3760 return future; 3761 } 3762 3763 @Override 3764 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3765 ServiceCaller<S, R> callable, ServerName serverName) { 3766 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3767 this.<Message> newServerCaller().serverName(serverName)); 3768 S stub = stubMaker.apply(channel); 3769 CompletableFuture<R> future = new CompletableFuture<>(); 3770 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3771 callable.call(stub, controller, resp -> { 3772 if (controller.failed()) { 3773 future.completeExceptionally(controller.getFailed()); 3774 } else { 3775 future.complete(resp); 3776 } 3777 }); 3778 return future; 3779 } 3780 3781 @Override 3782 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3783 return this.<List<ServerName>> newMasterCaller() 3784 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3785 List<ServerName>> call(controller, stub, 3786 RequestConverter.buildClearDeadServersRequest(servers), 3787 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3788 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3789 .call(); 3790 } 3791 3792 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3793 return this.connection.callerFactory.<T> serverRequest() 3794 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3795 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3796 .pause(pauseNs, TimeUnit.NANOSECONDS) 3797 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3798 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3799 } 3800 3801 @Override 3802 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3803 if (tableName == null) { 3804 return failedFuture(new IllegalArgumentException("Table name is null")); 3805 } 3806 CompletableFuture<Void> future = new CompletableFuture<>(); 3807 addListener(tableExists(tableName), (exist, err) -> { 3808 if (err != null) { 3809 future.completeExceptionally(err); 3810 return; 3811 } 3812 if (!exist) { 3813 future.completeExceptionally(new TableNotFoundException( 3814 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3815 return; 3816 } 3817 addListener(getTableSplits(tableName), (splits, err1) -> { 3818 if (err1 != null) { 3819 future.completeExceptionally(err1); 3820 } else { 3821 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3822 if (err2 != null) { 3823 future.completeExceptionally(err2); 3824 } else { 3825 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3826 if (err3 != null) { 3827 future.completeExceptionally(err3); 3828 } else { 3829 future.complete(result3); 3830 } 3831 }); 3832 } 3833 }); 3834 } 3835 }); 3836 }); 3837 return future; 3838 } 3839 3840 @Override 3841 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3842 if (tableName == null) { 3843 return failedFuture(new IllegalArgumentException("Table name is null")); 3844 } 3845 CompletableFuture<Void> future = new CompletableFuture<>(); 3846 addListener(tableExists(tableName), (exist, err) -> { 3847 if (err != null) { 3848 future.completeExceptionally(err); 3849 return; 3850 } 3851 if (!exist) { 3852 future.completeExceptionally(new TableNotFoundException( 3853 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3854 return; 3855 } 3856 addListener(setTableReplication(tableName, false), (result, err2) -> { 3857 if (err2 != null) { 3858 future.completeExceptionally(err2); 3859 } else { 3860 future.complete(result); 3861 } 3862 }); 3863 }); 3864 return future; 3865 } 3866 3867 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3868 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3869 addListener( 3870 getRegions(tableName).thenApply(regions -> regions.stream() 3871 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3872 (regions, err2) -> { 3873 if (err2 != null) { 3874 future.completeExceptionally(err2); 3875 return; 3876 } 3877 if (regions.size() == 1) { 3878 future.complete(null); 3879 } else { 3880 byte[][] splits = new byte[regions.size() - 1][]; 3881 for (int i = 1; i < regions.size(); i++) { 3882 splits[i - 1] = regions.get(i).getStartKey(); 3883 } 3884 future.complete(splits); 3885 } 3886 }); 3887 return future; 3888 } 3889 3890 /** 3891 * Connect to peer and check the table descriptor on peer: 3892 * <ol> 3893 * <li>Create the same table on peer when not exist.</li> 3894 * <li>Throw an exception if the table already has replication enabled on any of the column 3895 * families.</li> 3896 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3897 * </ol> 3898 * @param tableName name of the table to sync to the peer 3899 * @param splits table split keys 3900 */ 3901 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3902 byte[][] splits) { 3903 CompletableFuture<Void> future = new CompletableFuture<>(); 3904 addListener(listReplicationPeers(), (peers, err) -> { 3905 if (err != null) { 3906 future.completeExceptionally(err); 3907 return; 3908 } 3909 if (peers == null || peers.size() <= 0) { 3910 future.completeExceptionally( 3911 new IllegalArgumentException("Found no peer cluster for replication.")); 3912 return; 3913 } 3914 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3915 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3916 .forEach(peer -> { 3917 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3918 }); 3919 addListener( 3920 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3921 (result, err2) -> { 3922 if (err2 != null) { 3923 future.completeExceptionally(err2); 3924 } else { 3925 future.complete(result); 3926 } 3927 }); 3928 }); 3929 return future; 3930 } 3931 3932 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3933 ReplicationPeerDescription peer) { 3934 Configuration peerConf; 3935 try { 3936 peerConf = ReplicationPeerConfigUtil 3937 .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig()); 3938 } catch (IOException e) { 3939 return failedFuture(e); 3940 } 3941 URI connectionUri = 3942 ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey()); 3943 CompletableFuture<Void> future = new CompletableFuture<>(); 3944 addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> { 3945 if (err != null) { 3946 future.completeExceptionally(err); 3947 return; 3948 } 3949 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3950 if (err1 != null) { 3951 future.completeExceptionally(err1); 3952 return; 3953 } 3954 AsyncAdmin peerAdmin = conn.getAdmin(); 3955 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3956 if (err2 != null) { 3957 future.completeExceptionally(err2); 3958 return; 3959 } 3960 if (!exist) { 3961 CompletableFuture<Void> createTableFuture = null; 3962 if (splits == null) { 3963 createTableFuture = peerAdmin.createTable(tableDesc); 3964 } else { 3965 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3966 } 3967 addListener(createTableFuture, (result, err3) -> { 3968 if (err3 != null) { 3969 future.completeExceptionally(err3); 3970 } else { 3971 future.complete(result); 3972 } 3973 }); 3974 } else { 3975 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3976 (result, err4) -> { 3977 if (err4 != null) { 3978 future.completeExceptionally(err4); 3979 } else { 3980 future.complete(result); 3981 } 3982 }); 3983 } 3984 }); 3985 }); 3986 }); 3987 return future; 3988 } 3989 3990 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3991 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3992 CompletableFuture<Void> future = new CompletableFuture<>(); 3993 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3994 if (err != null) { 3995 future.completeExceptionally(err); 3996 return; 3997 } 3998 if (peerTableDesc == null) { 3999 future.completeExceptionally( 4000 new IllegalArgumentException("Failed to get table descriptor for table " 4001 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 4002 return; 4003 } 4004 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 4005 future.completeExceptionally(new IllegalArgumentException( 4006 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 4007 + ", but the table descriptors are not same when compared with source cluster." 4008 + " Thus can not enable the table's replication switch.")); 4009 return; 4010 } 4011 future.complete(null); 4012 }); 4013 return future; 4014 } 4015 4016 /** 4017 * Set the table's replication switch if the table's replication switch is already not set. 4018 * @param tableName name of the table 4019 * @param enableRep is replication switch enable or disable 4020 */ 4021 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 4022 CompletableFuture<Void> future = new CompletableFuture<>(); 4023 addListener(getDescriptor(tableName), (tableDesc, err) -> { 4024 if (err != null) { 4025 future.completeExceptionally(err); 4026 return; 4027 } 4028 if (!tableDesc.matchReplicationScope(enableRep)) { 4029 int scope = 4030 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 4031 TableDescriptor newTableDesc = 4032 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 4033 addListener(modifyTable(newTableDesc), (result, err2) -> { 4034 if (err2 != null) { 4035 future.completeExceptionally(err2); 4036 } else { 4037 future.complete(result); 4038 } 4039 }); 4040 } else { 4041 future.complete(null); 4042 } 4043 }); 4044 return future; 4045 } 4046 4047 @Override 4048 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 4049 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 4050 request.setPeerId(peerId); 4051 return this.<Boolean> newMasterCaller() 4052 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 4053 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 4054 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 4055 resp -> resp.getIsEnabled())) 4056 .call(); 4057 } 4058 4059 private void waitUntilAllReplicationPeerModificationProceduresDone( 4060 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 4061 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 4062 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 4063 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 4064 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 4065 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 4066 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 4067 resp -> resp.getProcedureList())) 4068 .call(); 4069 addListener(callFuture, (r, e) -> { 4070 if (e != null) { 4071 future.completeExceptionally(e); 4072 } else if (r.isEmpty()) { 4073 // we are done 4074 future.complete(prevOn); 4075 } else { 4076 // retry later to see if the procedures are done 4077 retryTimer.newTimeout( 4078 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 4079 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 4080 } 4081 }); 4082 } 4083 4084 @Override 4085 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 4086 boolean drainProcedures) { 4087 ReplicationPeerModificationSwitchRequest request = 4088 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 4089 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 4090 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 4091 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 4092 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 4093 resp -> resp.getPreviousValue())) 4094 .call(); 4095 // if we do not need to wait all previous peer modification procedure done, or we are enabling 4096 // peer modification, just return here. 4097 if (!drainProcedures || on) { 4098 return callFuture; 4099 } 4100 // otherwise we need to wait until all previous peer modification procedure done 4101 CompletableFuture<Boolean> future = new CompletableFuture<>(); 4102 addListener(callFuture, (prevOn, err) -> { 4103 if (err != null) { 4104 future.completeExceptionally(err); 4105 return; 4106 } 4107 // even if the previous state is disabled, we still need to wait here, as there could be 4108 // another client thread which called this method just before us and have already changed the 4109 // state to off, but there are still peer modification procedures not finished, so we should 4110 // also wait here. 4111 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 4112 }); 4113 return future; 4114 } 4115 4116 @Override 4117 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 4118 return this.<Boolean> newMasterCaller() 4119 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 4120 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 4121 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 4122 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 4123 (resp) -> resp.getEnabled())) 4124 .call(); 4125 } 4126 4127 @Override 4128 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 4129 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 4130 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 4131 if (err != null) { 4132 future.completeExceptionally(err); 4133 return; 4134 } 4135 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 4136 locations.stream().filter(l -> l.getRegion() != null) 4137 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 4138 .collect(Collectors.groupingBy(l -> l.getServerName(), 4139 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 4140 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 4141 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 4142 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 4143 futures 4144 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 4145 if (err2 != null) { 4146 future.completeExceptionally(unwrapCompletionException(err2)); 4147 } else { 4148 aggregator.append(stats); 4149 } 4150 })); 4151 } 4152 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 4153 (ret, err3) -> { 4154 if (err3 != null) { 4155 future.completeExceptionally(unwrapCompletionException(err3)); 4156 } else { 4157 future.complete(aggregator.sum()); 4158 } 4159 }); 4160 }); 4161 return future; 4162 } 4163 4164 @Override 4165 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 4166 boolean preserveSplits) { 4167 CompletableFuture<Void> future = new CompletableFuture<>(); 4168 addListener(tableExists(tableName), (exist, err) -> { 4169 if (err != null) { 4170 future.completeExceptionally(err); 4171 return; 4172 } 4173 if (!exist) { 4174 future.completeExceptionally(new TableNotFoundException(tableName)); 4175 return; 4176 } 4177 addListener(tableExists(newTableName), (exist1, err1) -> { 4178 if (err1 != null) { 4179 future.completeExceptionally(err1); 4180 return; 4181 } 4182 if (exist1) { 4183 future.completeExceptionally(new TableExistsException(newTableName)); 4184 return; 4185 } 4186 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 4187 if (err2 != null) { 4188 future.completeExceptionally(err2); 4189 return; 4190 } 4191 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 4192 if (preserveSplits) { 4193 addListener(getTableSplits(tableName), (splits, err3) -> { 4194 if (err3 != null) { 4195 future.completeExceptionally(err3); 4196 } else { 4197 addListener( 4198 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 4199 (result, err4) -> { 4200 if (err4 != null) { 4201 future.completeExceptionally(err4); 4202 } else { 4203 future.complete(result); 4204 } 4205 }); 4206 } 4207 }); 4208 } else { 4209 addListener(createTable(newTableDesc), (result, err5) -> { 4210 if (err5 != null) { 4211 future.completeExceptionally(err5); 4212 } else { 4213 future.complete(result); 4214 } 4215 }); 4216 } 4217 }); 4218 }); 4219 }); 4220 return future; 4221 } 4222 4223 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4224 List<RegionInfo> hris) { 4225 return this.<CacheEvictionStats> newAdminCaller() 4226 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4227 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4228 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4229 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4230 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4231 .serverName(serverName).call(); 4232 } 4233 4234 @Override 4235 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4236 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4237 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4238 Boolean> call(controller, stub, 4239 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4240 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4241 resp -> resp.getPreviousRpcThrottleEnabled())) 4242 .call(); 4243 return future; 4244 } 4245 4246 @Override 4247 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4248 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4249 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4250 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4251 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4252 resp -> resp.getRpcThrottleEnabled())) 4253 .call(); 4254 return future; 4255 } 4256 4257 @Override 4258 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4259 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4260 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4261 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4262 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4263 .build(), 4264 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4265 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4266 .call(); 4267 return future; 4268 } 4269 4270 @Override 4271 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4272 return this.<Map<TableName, Long>> newMasterCaller() 4273 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4274 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4275 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4276 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4277 resp -> resp.getSizesList().stream().collect(Collectors 4278 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4279 .call(); 4280 } 4281 4282 @Override 4283 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4284 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4285 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4286 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4287 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4288 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4289 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4290 resp -> resp.getSnapshotsList().stream() 4291 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4292 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4293 .serverName(serverName).call(); 4294 } 4295 4296 private CompletableFuture<SpaceQuotaSnapshot> 4297 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4298 return this.<SpaceQuotaSnapshot> newMasterCaller() 4299 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4300 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4301 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4302 .call(); 4303 } 4304 4305 @Override 4306 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4307 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4308 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4309 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4310 } 4311 4312 @Override 4313 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4314 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4315 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4316 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4317 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4318 } 4319 4320 @Override 4321 public CompletableFuture<Void> grant(UserPermission userPermission, 4322 boolean mergeExistingPermissions) { 4323 return this.<Void> newMasterCaller() 4324 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4325 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4326 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4327 .call(); 4328 } 4329 4330 @Override 4331 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4332 return this.<Void> newMasterCaller() 4333 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4334 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4335 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4336 .call(); 4337 } 4338 4339 @Override 4340 public CompletableFuture<List<UserPermission>> 4341 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4342 return this.<List<UserPermission>> newMasterCaller() 4343 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4344 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4345 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4346 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4347 resp -> resp.getUserPermissionList().stream() 4348 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4349 .collect(Collectors.toList()))) 4350 .call(); 4351 } 4352 4353 @Override 4354 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4355 List<Permission> permissions) { 4356 return this.<List<Boolean>> newMasterCaller() 4357 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4358 List<Boolean>> call(controller, stub, 4359 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4360 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4361 resp -> resp.getHasUserPermissionList())) 4362 .call(); 4363 } 4364 4365 @Override 4366 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4367 return this.<Boolean> newMasterCaller() 4368 .action((controller, stub) -> this.call(controller, stub, 4369 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4370 MasterService.Interface::switchSnapshotCleanup, 4371 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4372 .call(); 4373 } 4374 4375 @Override 4376 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4377 return this.<Boolean> newMasterCaller() 4378 .action((controller, stub) -> this.call(controller, stub, 4379 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4380 MasterService.Interface::isSnapshotCleanupEnabled, 4381 IsSnapshotCleanupEnabledResponse::getEnabled)) 4382 .call(); 4383 } 4384 4385 @Override 4386 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4387 return this.<Void> newMasterCaller() 4388 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4389 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4390 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4391 .call(); 4392 } 4393 4394 @Override 4395 public CompletableFuture<Void> addRSGroup(String groupName) { 4396 return this.<Void> newMasterCaller() 4397 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4398 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4399 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4400 .call(); 4401 } 4402 4403 @Override 4404 public CompletableFuture<Void> removeRSGroup(String groupName) { 4405 return this.<Void> newMasterCaller() 4406 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4407 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4408 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4409 .call(); 4410 } 4411 4412 @Override 4413 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4414 BalanceRequest request) { 4415 return this.<BalanceResponse> newMasterCaller() 4416 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4417 BalanceResponse> call(controller, stub, 4418 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4419 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4420 .call(); 4421 } 4422 4423 @Override 4424 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4425 return this.<List<RSGroupInfo>> newMasterCaller() 4426 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4427 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4428 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4429 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4430 .call(); 4431 } 4432 4433 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4434 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4435 final String logType) { 4436 if (CollectionUtils.isEmpty(serverNames)) { 4437 return CompletableFuture.completedFuture(Collections.emptyList()); 4438 } 4439 return CompletableFuture.supplyAsync(() -> serverNames 4440 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4441 filterParams, limit, logType)) 4442 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4443 } 4444 4445 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4446 Map<String, Object> filterParams, int limit, String logType) { 4447 return this.<List<LogEntry>> newAdminCaller() 4448 .action((controller, stub) -> this.adminCall(controller, stub, 4449 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4450 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4451 .serverName(serverName).call(); 4452 } 4453 4454 @Override 4455 public CompletableFuture<List<Boolean>> 4456 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4457 if (CollectionUtils.isEmpty(serverNames)) { 4458 return CompletableFuture.completedFuture(Collections.emptyList()); 4459 } 4460 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4461 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4462 return convertToFutureOfList(clearSlowLogResponseList); 4463 } 4464 4465 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4466 return this.<Boolean> newAdminCaller() 4467 .action((controller, stub) -> this.adminCall(controller, stub, 4468 RequestConverter.buildClearSlowLogResponseRequest(), 4469 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4470 .serverName(serverName).call(); 4471 } 4472 4473 private static <T> CompletableFuture<List<T>> 4474 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4475 CompletableFuture<Void> allDoneFuture = 4476 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4477 return allDoneFuture 4478 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4479 } 4480 4481 @Override 4482 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4483 return this.<List<TableName>> newMasterCaller() 4484 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4485 List<TableName>> call(controller, stub, 4486 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4487 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4488 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4489 .call(); 4490 } 4491 4492 @Override 4493 public CompletableFuture<Pair<List<String>, List<TableName>>> 4494 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4495 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4496 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4497 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4498 Pair<List<String>, List<TableName>>> call(controller, stub, 4499 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4500 .build(), 4501 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4502 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4503 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4504 .call(); 4505 } 4506 4507 @Override 4508 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4509 return this.<RSGroupInfo> newMasterCaller() 4510 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4511 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4512 GetRSGroupInfoOfServerRequest.newBuilder() 4513 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4514 .setPort(hostPort.getPort()).build()) 4515 .build(), 4516 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4517 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4518 .call(); 4519 } 4520 4521 @Override 4522 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4523 return this.<Void> newMasterCaller() 4524 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4525 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4526 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4527 .call(); 4528 } 4529 4530 @Override 4531 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4532 CompletableFuture<Void> future = new CompletableFuture<>(); 4533 for (TableName tableName : tables) { 4534 addListener(tableExists(tableName), (exist, err) -> { 4535 if (err != null) { 4536 future.completeExceptionally(err); 4537 return; 4538 } 4539 if (!exist) { 4540 future.completeExceptionally(new TableNotFoundException(tableName)); 4541 return; 4542 } 4543 }); 4544 } 4545 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4546 if (err != null) { 4547 future.completeExceptionally(err); 4548 return; 4549 } 4550 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4551 future.complete(null); 4552 return; 4553 } 4554 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4555 for (TableDescriptor td : tableDescriptions) { 4556 newTableDescriptors 4557 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4558 } 4559 addListener( 4560 CompletableFuture.allOf( 4561 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4562 (v, e) -> { 4563 if (e != null) { 4564 future.completeExceptionally(e); 4565 } else { 4566 future.complete(v); 4567 } 4568 }); 4569 }); 4570 return future; 4571 } 4572 4573 @Override 4574 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4575 return this.<RSGroupInfo> newMasterCaller() 4576 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4577 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4578 GetRSGroupInfoOfTableRequest.newBuilder() 4579 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4580 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4581 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4582 .call(); 4583 } 4584 4585 @Override 4586 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4587 return this.<RSGroupInfo> newMasterCaller() 4588 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4589 RSGroupInfo> call(controller, stub, 4590 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4591 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4592 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4593 .call(); 4594 } 4595 4596 @Override 4597 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4598 return this.<Void> newMasterCaller() 4599 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4600 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4601 .setNewRsgroupName(newName).build(), 4602 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4603 .call(); 4604 } 4605 4606 @Override 4607 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4608 Map<String, String> configuration) { 4609 UpdateRSGroupConfigRequest.Builder request = 4610 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4611 if (configuration != null) { 4612 configuration.entrySet().forEach(e -> request.addConfiguration( 4613 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4614 } 4615 return this.<Void> newMasterCaller() 4616 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4617 Void> call(controller, stub, request.build(), 4618 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4619 .call(); 4620 } 4621 4622 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4623 return this.<List<LogEntry>> newMasterCaller() 4624 .action((controller, stub) -> this.call(controller, stub, 4625 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4626 ProtobufUtil::toBalancerDecisionResponse)) 4627 .call(); 4628 } 4629 4630 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4631 return this.<List<LogEntry>> newMasterCaller() 4632 .action((controller, stub) -> this.call(controller, stub, 4633 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4634 ProtobufUtil::toBalancerRejectionResponse)) 4635 .call(); 4636 } 4637 4638 @Override 4639 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4640 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4641 if (logType == null || serverType == null) { 4642 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4643 } 4644 switch (logType) { 4645 case "SLOW_LOG": 4646 case "LARGE_LOG": 4647 if (ServerType.MASTER.equals(serverType)) { 4648 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4649 } 4650 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4651 case "BALANCER_DECISION": 4652 if (ServerType.REGION_SERVER.equals(serverType)) { 4653 throw new IllegalArgumentException( 4654 "Balancer Decision logs are not maintained by HRegionServer"); 4655 } 4656 return getBalancerDecisions(limit); 4657 case "BALANCER_REJECTION": 4658 if (ServerType.REGION_SERVER.equals(serverType)) { 4659 throw new IllegalArgumentException( 4660 "Balancer Rejection logs are not maintained by HRegionServer"); 4661 } 4662 return getBalancerRejections(limit); 4663 default: 4664 return CompletableFuture.completedFuture(Collections.emptyList()); 4665 } 4666 } 4667 4668 @Override 4669 public CompletableFuture<Void> flushMasterStore() { 4670 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4671 return this.<Void> newMasterCaller() 4672 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4673 Void> call(controller, stub, request.build(), 4674 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4675 .call(); 4676 } 4677 4678 @Override 4679 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4680 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4681 return this.<List<String>> newAdminCaller() 4682 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4683 List<String>> adminCall(controller, stub, request.build(), 4684 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4685 resp -> resp.getCachedFilesList())) 4686 .serverName(serverName).call(); 4687 } 4688 4689 @Override 4690 public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) { 4691 MasterProtos.RestoreBackupSystemTableRequest request = 4692 MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName) 4693 .build(); 4694 return this.<MasterProtos.RestoreBackupSystemTableRequest, 4695 MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request, 4696 MasterService.Interface::restoreBackupSystemTable, 4697 MasterProtos.RestoreBackupSystemTableResponse::getProcId, 4698 new RestoreBackupSystemTableProcedureBiConsumer()); 4699 } 4700}