001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.net.URI; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 052import org.apache.hadoop.hbase.CatalogFamilyFormat; 053import org.apache.hadoop.hbase.ClientMetaTableAccessor; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.NamespaceDescriptor; 061import org.apache.hadoop.hbase.RegionLocations; 062import org.apache.hadoop.hbase.RegionMetrics; 063import org.apache.hadoop.hbase.RegionMetricsBuilder; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableExistsException; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotDisabledException; 068import org.apache.hadoop.hbase.TableNotEnabledException; 069import org.apache.hadoop.hbase.TableNotFoundException; 070import org.apache.hadoop.hbase.UnknownRegionException; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.Scan.ReadType; 075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 076import org.apache.hadoop.hbase.client.replication.TableCFs; 077import org.apache.hadoop.hbase.client.security.SecurityCapability; 078import org.apache.hadoop.hbase.exceptions.DeserializationException; 079import org.apache.hadoop.hbase.ipc.HBaseRpcController; 080import org.apache.hadoop.hbase.net.Address; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 086import org.apache.hadoop.hbase.replication.ReplicationException; 087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 089import org.apache.hadoop.hbase.replication.SyncReplicationState; 090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 092import org.apache.hadoop.hbase.security.access.Permission; 093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 094import org.apache.hadoop.hbase.security.access.UserPermission; 095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.Strings; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 109import org.apache.hbase.thirdparty.com.google.protobuf.Message; 110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 113import org.apache.hbase.thirdparty.io.netty.util.Timeout; 114import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 116 117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshHFilesRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshHFilesResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshMetaRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshMetaResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 361import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 362import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 363import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 364import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 365import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 366import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 367import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 368 369/** 370 * The implementation of AsyncAdmin. 371 * <p> 372 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 373 * be finished inside the rpc framework thread, which means that the callbacks registered to the 374 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 375 * this class should not try to do time consuming tasks in the callbacks. 376 * @since 2.0.0 377 * @see AsyncHBaseAdmin 378 * @see AsyncConnection#getAdmin() 379 * @see AsyncConnection#getAdminBuilder() 380 */ 381@InterfaceAudience.Private 382class RawAsyncHBaseAdmin implements AsyncAdmin { 383 384 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 385 386 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncHBaseAdmin.class); 387 388 private final AsyncConnectionImpl connection; 389 390 private final HashedWheelTimer retryTimer; 391 392 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 393 394 private final long rpcTimeoutNs; 395 396 private final long operationTimeoutNs; 397 398 private final long pauseNs; 399 400 private final long pauseNsForServerOverloaded; 401 402 private final int maxAttempts; 403 404 private final int startLogErrorsCnt; 405 406 private final NonceGenerator ng; 407 408 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 409 AsyncAdminBuilderBase builder) { 410 this.connection = connection; 411 this.retryTimer = retryTimer; 412 this.metaTable = connection.getTable(META_TABLE_NAME); 413 this.rpcTimeoutNs = builder.rpcTimeoutNs; 414 this.operationTimeoutNs = builder.operationTimeoutNs; 415 this.pauseNs = builder.pauseNs; 416 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 417 LOG.warn( 418 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 419 + " the normal pause value {} ms, use the greater one instead", 420 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 421 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 422 this.pauseNsForServerOverloaded = builder.pauseNs; 423 } else { 424 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 425 } 426 this.maxAttempts = builder.maxAttempts; 427 this.startLogErrorsCnt = builder.startLogErrorsCnt; 428 this.ng = connection.getNonceGenerator(); 429 } 430 431 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 432 return this.connection.callerFactory.<T> masterRequest() 433 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 434 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 435 .pause(pauseNs, TimeUnit.NANOSECONDS) 436 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 437 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 438 } 439 440 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 441 return this.connection.callerFactory.<T> adminRequest() 442 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 443 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 444 .pause(pauseNs, TimeUnit.NANOSECONDS) 445 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 446 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 447 } 448 449 @FunctionalInterface 450 private interface MasterRpcCall<RESP, REQ> { 451 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 452 RpcCallback<RESP> done); 453 } 454 455 @FunctionalInterface 456 private interface AdminRpcCall<RESP, REQ> { 457 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 458 RpcCallback<RESP> done); 459 } 460 461 @FunctionalInterface 462 private interface Converter<D, S> { 463 D convert(S src) throws IOException; 464 } 465 466 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 467 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 468 Converter<RESP, PRESP> respConverter) { 469 CompletableFuture<RESP> future = new CompletableFuture<>(); 470 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 471 472 @Override 473 public void run(PRESP resp) { 474 if (controller.failed()) { 475 future.completeExceptionally(controller.getFailed()); 476 } else { 477 try { 478 future.complete(respConverter.convert(resp)); 479 } catch (IOException e) { 480 future.completeExceptionally(e); 481 } 482 } 483 } 484 }); 485 return future; 486 } 487 488 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 489 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 490 Converter<RESP, PRESP> respConverter) { 491 CompletableFuture<RESP> future = new CompletableFuture<>(); 492 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 493 494 @Override 495 public void run(PRESP resp) { 496 if (controller.failed()) { 497 future.completeExceptionally(controller.getFailed()); 498 } else { 499 try { 500 future.complete(respConverter.convert(resp)); 501 } catch (IOException e) { 502 future.completeExceptionally(e); 503 } 504 } 505 } 506 }); 507 return future; 508 } 509 510 /** 511 * short-circuit call for 512 * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)} 513 * by ignoring procedure result 514 */ 515 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 516 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 517 ProcedureBiConsumer<Void> consumer) { 518 return procedureCall(preq, rpcCall, respConverter, result -> null, consumer); 519 } 520 521 /** 522 * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, 523 * ProcedureBiConsumer) by skip setting priority for request 524 */ 525 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq, 526 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 527 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 528 return procedureCall(b -> { 529 }, preq, rpcCall, respConverter, resultConverter, consumer); 530 } 531 532 /** 533 * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter, 534 * ProcedureBiConsumer) by ignoring procedure result 535 */ 536 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 537 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 538 ProcedureBiConsumer<Void> consumer) { 539 return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer); 540 } 541 542 /** 543 * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, 544 * ProcedureBiConsumer) by skip setting priority for request 545 */ 546 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq, 547 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 548 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 549 return procedureCall(b -> b.tableName(tableName), preq, rpcCall, respConverter, resultConverter, 550 consumer); 551 } 552 553 /** 554 * @param <PREQ> type of request 555 * @param <PRESP> type of response 556 * @param <PRES> type of procedure call result 557 * @param prioritySetter prioritySetter set priority by table for request 558 * @param preq procedure call request 559 * @param rpcCall procedure rpc call 560 * @param respConverter extract proc id from procedure call response 561 * @param resultConverter extract result from procedure call result 562 * @param consumer action performs on result 563 * @return procedure call result, null if procedure is void 564 */ 565 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall( 566 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 567 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 568 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 569 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller() 570 .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter)); 571 prioritySetter.accept(builder); 572 CompletableFuture<Long> procFuture = builder.call(); 573 CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter); 574 addListener(future, consumer); 575 return future; 576 } 577 578 @Override 579 public CompletableFuture<Boolean> tableExists(TableName tableName) { 580 if (TableName.isMetaTableName(tableName)) { 581 return CompletableFuture.completedFuture(true); 582 } 583 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 584 } 585 586 @Override 587 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 588 return getTableDescriptors( 589 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 590 } 591 592 /** 593 * {@link #listTableDescriptors(boolean)} 594 */ 595 @Override 596 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 597 boolean includeSysTables) { 598 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 599 + "use listTableDescriptors(boolean) instead"); 600 return getTableDescriptors( 601 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 602 } 603 604 @Override 605 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 606 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 607 + "use listTableDescriptors(boolean) instead"); 608 if (tableNames.isEmpty()) { 609 return CompletableFuture.completedFuture(Collections.emptyList()); 610 } 611 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 612 } 613 614 private CompletableFuture<List<TableDescriptor>> 615 getTableDescriptors(GetTableDescriptorsRequest request) { 616 return this.<List<TableDescriptor>> newMasterCaller() 617 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 618 List<TableDescriptor>> call(controller, stub, request, 619 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 620 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 621 .call(); 622 } 623 624 @Override 625 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 626 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 627 } 628 629 @Override 630 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 631 boolean includeSysTables) { 632 Preconditions.checkNotNull(pattern, 633 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 634 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 635 } 636 637 @Override 638 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 639 return this.<List<TableName>> newMasterCaller() 640 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 641 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 642 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 643 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 644 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 645 .call(); 646 } 647 648 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 649 return this.<List<TableName>> newMasterCaller() 650 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 651 List<TableName>> call(controller, stub, request, 652 (s, c, req, done) -> s.getTableNames(c, req, done), 653 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 654 .call(); 655 } 656 657 @Override 658 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 659 return this.<List<TableDescriptor>> newMasterCaller() 660 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 661 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 662 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 663 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 664 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 665 .call(); 666 } 667 668 @Override 669 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 670 return this.<List<TableDescriptor>> newMasterCaller() 671 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 672 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 673 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 674 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 675 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 676 .call(); 677 } 678 679 @Override 680 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 681 return this.<List<TableName>> newMasterCaller() 682 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 683 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 684 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 685 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 686 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 687 .call(); 688 } 689 690 @Override 691 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 692 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 693 addListener(this.<List<TableSchema>> newMasterCaller().tableName(tableName) 694 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 695 List<TableSchema>> call(controller, stub, 696 RequestConverter.buildGetTableDescriptorsRequest(tableName), 697 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 698 (resp) -> resp.getTableSchemaList())) 699 .call(), (tableSchemas, error) -> { 700 if (error != null) { 701 future.completeExceptionally(error); 702 return; 703 } 704 if (!tableSchemas.isEmpty()) { 705 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 706 } else { 707 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 708 } 709 }); 710 return future; 711 } 712 713 @Override 714 public CompletableFuture<Void> createTable(TableDescriptor desc) { 715 return createTable(desc.getTableName(), 716 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 717 } 718 719 @Override 720 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 721 int numRegions) { 722 try { 723 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 724 } catch (IllegalArgumentException e) { 725 return failedFuture(e); 726 } 727 } 728 729 @Override 730 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 731 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 732 + " use createTable(TableDescriptor) instead"); 733 try { 734 verifySplitKeys(splitKeys); 735 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 736 splitKeys, ng.getNonceGroup(), ng.newNonce())); 737 } catch (IllegalArgumentException e) { 738 return failedFuture(e); 739 } 740 } 741 742 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 743 Preconditions.checkNotNull(tableName, "table name is null"); 744 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 745 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 746 new CreateTableProcedureBiConsumer(tableName)); 747 } 748 749 @Override 750 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 751 return modifyTable(desc, true); 752 } 753 754 @Override 755 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 756 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 757 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 758 ng.newNonce(), reopenRegions), 759 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 760 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 761 } 762 763 @Override 764 public CompletableFuture<Void> reopenTableRegions(TableName tableName) { 765 return reopenTableRegions(tableName, Collections.emptyList()); 766 } 767 768 @Override 769 public CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions) { 770 List<byte[]> regionNames = regions.stream().map(RegionInfo::getRegionName).toList(); 771 return this.<ReopenTableRegionsRequest, ReopenTableRegionsResponse> procedureCall(tableName, 772 RequestConverter.buildReopenTableRegionsRequest(tableName, regionNames, ng.getNonceGroup(), 773 ng.newNonce()), 774 (s, c, req, done) -> s.reopenTableRegions(c, req, done), (resp) -> resp.getProcId(), 775 new ReopenTableRegionsProcedureBiConsumer(this, tableName)); 776 } 777 778 @Override 779 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 780 return this.<ModifyTableStoreFileTrackerRequest, 781 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 782 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 783 ng.getNonceGroup(), ng.newNonce()), 784 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 785 (resp) -> resp.getProcId(), 786 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 787 } 788 789 @Override 790 public CompletableFuture<Void> deleteTable(TableName tableName) { 791 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 792 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 793 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 794 new DeleteTableProcedureBiConsumer(tableName)); 795 } 796 797 @Override 798 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 799 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 800 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 801 ng.newNonce()), 802 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 803 new TruncateTableProcedureBiConsumer(tableName)); 804 } 805 806 @Override 807 public CompletableFuture<Void> enableTable(TableName tableName) { 808 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 809 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 810 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 811 new EnableTableProcedureBiConsumer(tableName)); 812 } 813 814 @Override 815 public CompletableFuture<Void> disableTable(TableName tableName) { 816 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 817 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 818 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 819 new DisableTableProcedureBiConsumer(tableName)); 820 } 821 822 /** 823 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 824 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 825 * targetState). 826 */ 827 private static CompletableFuture<Boolean> completeCheckTableState( 828 CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error, 829 TableState.State targetState, TableName tableName) { 830 if (error != null) { 831 future.completeExceptionally(error); 832 } else { 833 if (tableState.isPresent()) { 834 future.complete(tableState.get().inStates(targetState)); 835 } else { 836 future.completeExceptionally(new TableNotFoundException(tableName)); 837 } 838 } 839 return future; 840 } 841 842 @Override 843 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 844 if (TableName.isMetaTableName(tableName)) { 845 return CompletableFuture.completedFuture(true); 846 } 847 CompletableFuture<Boolean> future = new CompletableFuture<>(); 848 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 849 (tableState, error) -> { 850 completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName); 851 }); 852 return future; 853 } 854 855 @Override 856 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 857 if (TableName.isMetaTableName(tableName)) { 858 return CompletableFuture.completedFuture(false); 859 } 860 CompletableFuture<Boolean> future = new CompletableFuture<>(); 861 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 862 (tableState, error) -> { 863 completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName); 864 }); 865 return future; 866 } 867 868 @Override 869 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 870 if (TableName.isMetaTableName(tableName)) { 871 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 872 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 873 } 874 CompletableFuture<Boolean> future = new CompletableFuture<>(); 875 addListener(isTableEnabled(tableName), (enabled, error) -> { 876 if (error != null) { 877 if (error instanceof TableNotFoundException) { 878 future.complete(false); 879 } else { 880 future.completeExceptionally(error); 881 } 882 return; 883 } 884 if (!enabled) { 885 future.complete(false); 886 } else { 887 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 888 (locations, error1) -> { 889 if (error1 != null) { 890 future.completeExceptionally(error1); 891 return; 892 } 893 List<HRegionLocation> notDeployedRegions = locations.stream() 894 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 895 if (notDeployedRegions.size() > 0) { 896 if (LOG.isDebugEnabled()) { 897 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 898 } 899 future.complete(false); 900 return; 901 } 902 future.complete(true); 903 }); 904 } 905 }); 906 return future; 907 } 908 909 @Override 910 public CompletableFuture<Void> addColumnFamily(TableName tableName, 911 ColumnFamilyDescriptor columnFamily) { 912 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 913 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 914 ng.newNonce()), 915 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 916 new AddColumnFamilyProcedureBiConsumer(tableName)); 917 } 918 919 @Override 920 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 921 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 922 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 923 ng.newNonce()), 924 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 925 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 926 } 927 928 @Override 929 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 930 ColumnFamilyDescriptor columnFamily) { 931 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 932 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 933 ng.newNonce()), 934 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 935 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 936 } 937 938 @Override 939 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 940 byte[] family, String dstSFT) { 941 return this.<ModifyColumnStoreFileTrackerRequest, 942 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 943 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 944 ng.getNonceGroup(), ng.newNonce()), 945 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 946 (resp) -> resp.getProcId(), 947 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 948 } 949 950 @Override 951 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 952 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 953 RequestConverter.buildCreateNamespaceRequest(descriptor), 954 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 955 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 956 } 957 958 @Override 959 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 960 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 961 RequestConverter.buildModifyNamespaceRequest(descriptor), 962 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 963 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 964 } 965 966 @Override 967 public CompletableFuture<Void> deleteNamespace(String name) { 968 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 969 RequestConverter.buildDeleteNamespaceRequest(name), 970 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 971 new DeleteNamespaceProcedureBiConsumer(name)); 972 } 973 974 @Override 975 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 976 return this.<NamespaceDescriptor> newMasterCaller() 977 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 978 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 979 RequestConverter.buildGetNamespaceDescriptorRequest(name), 980 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 981 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 982 .call(); 983 } 984 985 @Override 986 public CompletableFuture<List<String>> listNamespaces() { 987 return this.<List<String>> newMasterCaller() 988 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 989 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 990 (s, c, req, done) -> s.listNamespaces(c, req, done), 991 (resp) -> resp.getNamespaceNameList())) 992 .call(); 993 } 994 995 @Override 996 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 997 return this.<List<NamespaceDescriptor>> newMasterCaller() 998 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 999 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 1000 ListNamespaceDescriptorsRequest.newBuilder().build(), 1001 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 1002 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 1003 .call(); 1004 } 1005 1006 @Override 1007 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 1008 return this.<List<RegionInfo>> newAdminCaller() 1009 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 1010 List<RegionInfo>> adminCall(controller, stub, 1011 RequestConverter.buildGetOnlineRegionRequest(), 1012 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 1013 resp -> ProtobufUtil.getRegionInfos(resp))) 1014 .serverName(serverName).call(); 1015 } 1016 1017 @Override 1018 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 1019 if (tableName.equals(META_TABLE_NAME)) { 1020 return connection.registry.getMetaRegionLocations() 1021 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 1022 .collect(Collectors.toList())); 1023 } else { 1024 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 1025 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 1026 } 1027 } 1028 1029 @Override 1030 public CompletableFuture<Void> flush(TableName tableName) { 1031 return flush(tableName, Collections.emptyList()); 1032 } 1033 1034 @Override 1035 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 1036 Preconditions.checkNotNull(columnFamily, 1037 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 1038 return flush(tableName, Collections.singletonList(columnFamily)); 1039 } 1040 1041 @Override 1042 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 1043 // This is for keeping compatibility with old implementation. 1044 // If the server version is lower than the client version, it's possible that the 1045 // flushTable method is not present in the server side, if so, we need to fall back 1046 // to the old implementation. 1047 Preconditions.checkNotNull(columnFamilyList, 1048 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 1049 List<byte[]> columnFamilies = columnFamilyList.stream() 1050 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 1051 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 1052 ng.getNonceGroup(), ng.newNonce()); 1053 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 1054 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 1055 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 1056 CompletableFuture<Void> future = new CompletableFuture<>(); 1057 addListener(procFuture, (ret, error) -> { 1058 if (error != null) { 1059 if ( 1060 error instanceof TableNotFoundException || error instanceof TableNotEnabledException 1061 || error instanceof NoSuchColumnFamilyException 1062 ) { 1063 future.completeExceptionally(error); 1064 } else if (error instanceof DoNotRetryIOException) { 1065 // usually this is caused by the method is not present on the server or 1066 // the hbase hadoop version does not match the running hadoop version. 1067 // if that happens, we need fall back to the old flush implementation. 1068 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 1069 legacyFlush(future, tableName, columnFamilies); 1070 } else { 1071 future.completeExceptionally(error); 1072 } 1073 } else { 1074 future.complete(ret); 1075 } 1076 }); 1077 return future; 1078 } 1079 1080 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1081 List<byte[]> columnFamilies) { 1082 addListener(tableExists(tableName), (exists, err) -> { 1083 if (err != null) { 1084 future.completeExceptionally(err); 1085 } else if (!exists) { 1086 future.completeExceptionally(new TableNotFoundException(tableName)); 1087 } else { 1088 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1089 if (err2 != null) { 1090 future.completeExceptionally(err2); 1091 } else if (!tableEnabled) { 1092 future.completeExceptionally(new TableNotEnabledException(tableName)); 1093 } else { 1094 Map<String, String> props = new HashMap<>(); 1095 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1096 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1097 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1098 } 1099 addListener( 1100 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1101 (ret, err3) -> { 1102 if (err3 != null) { 1103 future.completeExceptionally(err3); 1104 } else { 1105 future.complete(ret); 1106 } 1107 }); 1108 } 1109 }); 1110 } 1111 }); 1112 } 1113 1114 @Override 1115 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1116 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 1117 }); 1118 } 1119 1120 @Override 1121 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1122 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1123 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1124 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1125 }); 1126 } 1127 1128 /** 1129 * This method is for internal use only, where we need the response of the flush. 1130 * <p/> 1131 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1132 * API. 1133 */ 1134 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1135 boolean writeFlushWALMarker) { 1136 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1137 addListener(getRegionLocation(regionName), (location, err) -> { 1138 if (err != null) { 1139 future.completeExceptionally(err); 1140 return; 1141 } 1142 ServerName serverName = location.getServerName(); 1143 if (serverName == null) { 1144 future 1145 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1146 return; 1147 } 1148 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1149 (ret, err2) -> { 1150 if (err2 != null) { 1151 future.completeExceptionally(err2); 1152 } else { 1153 future.complete(ret); 1154 } 1155 }); 1156 }); 1157 return future; 1158 } 1159 1160 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1161 byte[] columnFamily, boolean writeFlushWALMarker) { 1162 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1163 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1164 FlushRegionResponse> adminCall(controller, stub, 1165 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1166 writeFlushWALMarker), 1167 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1168 .call(); 1169 } 1170 1171 @Override 1172 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1173 CompletableFuture<Void> future = new CompletableFuture<>(); 1174 addListener(getRegions(sn), (hRegionInfos, err) -> { 1175 if (err != null) { 1176 future.completeExceptionally(err); 1177 return; 1178 } 1179 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1180 if (hRegionInfos != null) { 1181 hRegionInfos 1182 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1183 }))); 1184 } 1185 addListener(CompletableFuture.allOf( 1186 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1187 if (err2 != null) { 1188 future.completeExceptionally(err2); 1189 } else { 1190 future.complete(ret); 1191 } 1192 }); 1193 }); 1194 return future; 1195 } 1196 1197 @Override 1198 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1199 return compact(tableName, null, false, compactType); 1200 } 1201 1202 @Override 1203 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1204 CompactType compactType) { 1205 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1206 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1207 return compact(tableName, columnFamily, false, compactType); 1208 } 1209 1210 @Override 1211 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1212 return compactRegion(regionName, null, false); 1213 } 1214 1215 @Override 1216 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1217 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1218 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1219 return compactRegion(regionName, columnFamily, false); 1220 } 1221 1222 @Override 1223 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1224 return compact(tableName, null, true, compactType); 1225 } 1226 1227 @Override 1228 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1229 CompactType compactType) { 1230 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1231 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1232 return compact(tableName, columnFamily, true, compactType); 1233 } 1234 1235 @Override 1236 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1237 return compactRegion(regionName, null, true); 1238 } 1239 1240 @Override 1241 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1242 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1243 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1244 return compactRegion(regionName, columnFamily, true); 1245 } 1246 1247 @Override 1248 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1249 return compactRegionServer(sn, false); 1250 } 1251 1252 @Override 1253 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1254 return compactRegionServer(sn, true); 1255 } 1256 1257 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1258 CompletableFuture<Void> future = new CompletableFuture<>(); 1259 addListener(getRegions(sn), (hRegionInfos, err) -> { 1260 if (err != null) { 1261 future.completeExceptionally(err); 1262 return; 1263 } 1264 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1265 if (hRegionInfos != null) { 1266 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1267 } 1268 addListener(CompletableFuture.allOf( 1269 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1270 if (err2 != null) { 1271 future.completeExceptionally(err2); 1272 } else { 1273 future.complete(ret); 1274 } 1275 }); 1276 }); 1277 return future; 1278 } 1279 1280 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1281 boolean major) { 1282 CompletableFuture<Void> future = new CompletableFuture<>(); 1283 addListener(getRegionLocation(regionName), (location, err) -> { 1284 if (err != null) { 1285 future.completeExceptionally(err); 1286 return; 1287 } 1288 ServerName serverName = location.getServerName(); 1289 if (serverName == null) { 1290 future 1291 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1292 return; 1293 } 1294 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1295 (ret, err2) -> { 1296 if (err2 != null) { 1297 future.completeExceptionally(err2); 1298 } else { 1299 future.complete(ret); 1300 } 1301 }); 1302 }); 1303 return future; 1304 } 1305 1306 /** 1307 * List all region locations for the specific table. 1308 */ 1309 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1310 if (TableName.META_TABLE_NAME.equals(tableName)) { 1311 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1312 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1313 if (err != null) { 1314 future.completeExceptionally(err); 1315 } else if ( 1316 metaRegions == null || metaRegions.isEmpty() 1317 || metaRegions.getDefaultRegionLocation() == null 1318 ) { 1319 future.completeExceptionally(new IOException("meta region does not found")); 1320 } else { 1321 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1322 } 1323 }); 1324 return future; 1325 } else { 1326 // For non-meta table, we fetch all locations by scanning hbase:meta table 1327 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1328 } 1329 } 1330 1331 /** 1332 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1333 */ 1334 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1335 CompactType compactType) { 1336 CompletableFuture<Void> future = new CompletableFuture<>(); 1337 1338 switch (compactType) { 1339 case MOB: 1340 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1341 if (err != null) { 1342 future.completeExceptionally(err); 1343 return; 1344 } 1345 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1346 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1347 if (err2 != null) { 1348 future.completeExceptionally(err2); 1349 } else { 1350 future.complete(ret); 1351 } 1352 }); 1353 }); 1354 break; 1355 case NORMAL: 1356 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1357 if (err != null) { 1358 future.completeExceptionally(err); 1359 return; 1360 } 1361 if (locations == null || locations.isEmpty()) { 1362 future.completeExceptionally(new TableNotFoundException(tableName)); 1363 return; 1364 } 1365 CompletableFuture<?>[] compactFutures = 1366 locations.stream().filter(l -> l.getRegion() != null) 1367 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1368 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1369 .toArray(CompletableFuture<?>[]::new); 1370 // future complete unless all of the compact futures are completed. 1371 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1372 if (err2 != null) { 1373 future.completeExceptionally(err2); 1374 } else { 1375 future.complete(ret); 1376 } 1377 }); 1378 }); 1379 break; 1380 default: 1381 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1382 } 1383 return future; 1384 } 1385 1386 /** 1387 * Compact the region at specific region server. 1388 */ 1389 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1390 final boolean major, byte[] columnFamily) { 1391 return this.<Void> newAdminCaller().serverName(sn) 1392 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1393 Void> adminCall(controller, stub, 1394 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1395 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1396 .call(); 1397 } 1398 1399 private byte[] toEncodeRegionName(byte[] regionName) { 1400 return RegionInfo.isEncodedRegionName(regionName) 1401 ? regionName 1402 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1403 } 1404 1405 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1406 CompletableFuture<TableName> result) { 1407 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1408 if (err != null) { 1409 result.completeExceptionally(err); 1410 return; 1411 } 1412 RegionInfo regionInfo = location.getRegion(); 1413 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1414 result.completeExceptionally( 1415 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1416 return; 1417 } 1418 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1419 if (!tableName.get().equals(regionInfo.getTable())) { 1420 // tables of this two region should be same. 1421 result.completeExceptionally( 1422 new IllegalArgumentException("Cannot merge regions from two different tables " 1423 + tableName.get() + " and " + regionInfo.getTable())); 1424 } else { 1425 result.complete(tableName.get()); 1426 } 1427 } 1428 }); 1429 } 1430 1431 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1432 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1433 CompletableFuture<TableName> future = new CompletableFuture<>(); 1434 for (byte[] encodedRegionName : encodedRegionNames) { 1435 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1436 } 1437 return future; 1438 } 1439 1440 @Override 1441 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1442 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1443 } 1444 1445 @Override 1446 public CompletableFuture<Boolean> isMergeEnabled() { 1447 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1448 } 1449 1450 @Override 1451 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1452 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1453 } 1454 1455 @Override 1456 public CompletableFuture<Boolean> isSplitEnabled() { 1457 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1458 } 1459 1460 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1461 MasterSwitchType switchType) { 1462 SetSplitOrMergeEnabledRequest request = 1463 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1464 return this.<Boolean> newMasterCaller() 1465 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1466 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1467 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1468 (resp) -> resp.getPrevValueList().get(0))) 1469 .call(); 1470 } 1471 1472 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1473 IsSplitOrMergeEnabledRequest request = 1474 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1475 return this.<Boolean> newMasterCaller() 1476 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1477 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1478 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1479 .call(); 1480 } 1481 1482 @Override 1483 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1484 if (nameOfRegionsToMerge.size() < 2) { 1485 return failedFuture(new IllegalArgumentException( 1486 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1487 } 1488 CompletableFuture<Void> future = new CompletableFuture<>(); 1489 byte[][] encodedNameOfRegionsToMerge = 1490 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1491 1492 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1493 if (err != null) { 1494 future.completeExceptionally(err); 1495 return; 1496 } 1497 1498 final MergeTableRegionsRequest request; 1499 try { 1500 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1501 forcible, ng.getNonceGroup(), ng.newNonce()); 1502 } catch (DeserializationException e) { 1503 future.completeExceptionally(e); 1504 return; 1505 } 1506 1507 addListener( 1508 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1509 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1510 (ret, err2) -> { 1511 if (err2 != null) { 1512 future.completeExceptionally(err2); 1513 } else { 1514 future.complete(ret); 1515 } 1516 }); 1517 }); 1518 return future; 1519 } 1520 1521 @Override 1522 public CompletableFuture<Void> split(TableName tableName) { 1523 CompletableFuture<Void> future = new CompletableFuture<>(); 1524 addListener(tableExists(tableName), (exist, error) -> { 1525 if (error != null) { 1526 future.completeExceptionally(error); 1527 return; 1528 } 1529 if (!exist) { 1530 future.completeExceptionally(new TableNotFoundException(tableName)); 1531 return; 1532 } 1533 addListener(metaTable 1534 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1535 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1536 ClientMetaTableAccessor.QueryType.REGION)) 1537 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1538 ClientMetaTableAccessor.QueryType.REGION))), 1539 (results, err2) -> { 1540 if (err2 != null) { 1541 future.completeExceptionally(err2); 1542 return; 1543 } 1544 if (results != null && !results.isEmpty()) { 1545 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1546 for (Result r : results) { 1547 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1548 continue; 1549 } 1550 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1551 if (rl != null) { 1552 for (HRegionLocation h : rl.getRegionLocations()) { 1553 if (h != null && h.getServerName() != null) { 1554 RegionInfo hri = h.getRegion(); 1555 if ( 1556 hri == null || hri.isSplitParent() 1557 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1558 ) { 1559 continue; 1560 } 1561 splitFutures.add(split(hri, null)); 1562 } 1563 } 1564 } 1565 } 1566 addListener( 1567 CompletableFuture 1568 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1569 (ret, exception) -> { 1570 if (exception != null) { 1571 future.completeExceptionally(exception); 1572 return; 1573 } 1574 future.complete(ret); 1575 }); 1576 } else { 1577 future.complete(null); 1578 } 1579 }); 1580 }); 1581 return future; 1582 } 1583 1584 @Override 1585 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1586 CompletableFuture<Void> result = new CompletableFuture<>(); 1587 if (splitPoint == null) { 1588 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1589 } 1590 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1591 (loc, err) -> { 1592 if (err != null) { 1593 result.completeExceptionally(err); 1594 } else if (loc == null || loc.getRegion() == null) { 1595 result.completeExceptionally(new IllegalArgumentException( 1596 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1597 } else { 1598 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1599 if (err2 != null) { 1600 result.completeExceptionally(err2); 1601 } else { 1602 result.complete(ret); 1603 } 1604 1605 }); 1606 } 1607 }); 1608 return result; 1609 } 1610 1611 @Override 1612 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1613 CompletableFuture<Void> future = new CompletableFuture<>(); 1614 addListener(getRegionLocation(regionName), (location, err) -> { 1615 if (err != null) { 1616 future.completeExceptionally(err); 1617 return; 1618 } 1619 RegionInfo regionInfo = location.getRegion(); 1620 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1621 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1622 + "Replicas are auto-split when their primary is split.")); 1623 return; 1624 } 1625 ServerName serverName = location.getServerName(); 1626 if (serverName == null) { 1627 future 1628 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1629 return; 1630 } 1631 addListener(split(regionInfo, null), (ret, err2) -> { 1632 if (err2 != null) { 1633 future.completeExceptionally(err2); 1634 } else { 1635 future.complete(ret); 1636 } 1637 }); 1638 }); 1639 return future; 1640 } 1641 1642 @Override 1643 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1644 Preconditions.checkNotNull(splitPoint, 1645 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1646 CompletableFuture<Void> future = new CompletableFuture<>(); 1647 addListener(getRegionLocation(regionName), (location, err) -> { 1648 if (err != null) { 1649 future.completeExceptionally(err); 1650 return; 1651 } 1652 RegionInfo regionInfo = location.getRegion(); 1653 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1654 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1655 + "Replicas are auto-split when their primary is split.")); 1656 return; 1657 } 1658 ServerName serverName = location.getServerName(); 1659 if (serverName == null) { 1660 future 1661 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1662 return; 1663 } 1664 if ( 1665 regionInfo.getStartKey() != null 1666 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1667 ) { 1668 future.completeExceptionally( 1669 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1670 return; 1671 } 1672 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1673 if (err2 != null) { 1674 future.completeExceptionally(err2); 1675 } else { 1676 future.complete(ret); 1677 } 1678 }); 1679 }); 1680 return future; 1681 } 1682 1683 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1684 CompletableFuture<Void> future = new CompletableFuture<>(); 1685 TableName tableName = hri.getTable(); 1686 final SplitTableRegionRequest request; 1687 try { 1688 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1689 ng.newNonce()); 1690 } catch (DeserializationException e) { 1691 future.completeExceptionally(e); 1692 return future; 1693 } 1694 1695 addListener( 1696 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1697 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1698 (ret, err2) -> { 1699 if (err2 != null) { 1700 future.completeExceptionally(err2); 1701 } else { 1702 future.complete(ret); 1703 } 1704 }); 1705 return future; 1706 } 1707 1708 @Override 1709 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1710 CompletableFuture<Void> future = new CompletableFuture<>(); 1711 addListener(getRegionLocation(regionName), (location, err) -> { 1712 if (err != null) { 1713 future.completeExceptionally(err); 1714 return; 1715 } 1716 RegionInfo regionInfo = location.getRegion(); 1717 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1718 future.completeExceptionally(new IllegalArgumentException( 1719 "Can't truncate replicas directly.Replicas are auto-truncated " 1720 + "when their primary is truncated.")); 1721 return; 1722 } 1723 ServerName serverName = location.getServerName(); 1724 if (serverName == null) { 1725 future 1726 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1727 return; 1728 } 1729 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1730 if (err2 != null) { 1731 future.completeExceptionally(err2); 1732 } else { 1733 future.complete(ret); 1734 } 1735 }); 1736 }); 1737 return future; 1738 } 1739 1740 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1741 CompletableFuture<Void> future = new CompletableFuture<>(); 1742 TableName tableName = hri.getTable(); 1743 final MasterProtos.TruncateRegionRequest request; 1744 try { 1745 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1746 } catch (DeserializationException e) { 1747 future.completeExceptionally(e); 1748 return future; 1749 } 1750 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1751 MasterProtos.TruncateRegionResponse::getProcId, 1752 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1753 if (err2 != null) { 1754 future.completeExceptionally(err2); 1755 } else { 1756 future.complete(ret); 1757 } 1758 }); 1759 return future; 1760 } 1761 1762 @Override 1763 public CompletableFuture<Void> assign(byte[] regionName) { 1764 CompletableFuture<Void> future = new CompletableFuture<>(); 1765 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1766 if (err != null) { 1767 future.completeExceptionally(err); 1768 return; 1769 } 1770 addListener( 1771 this.<Void> newMasterCaller().tableName(regionInfo.getTable()) 1772 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1773 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1774 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1775 .call(), 1776 (ret, err2) -> { 1777 if (err2 != null) { 1778 future.completeExceptionally(err2); 1779 } else { 1780 future.complete(ret); 1781 } 1782 }); 1783 }); 1784 return future; 1785 } 1786 1787 @Override 1788 public CompletableFuture<Void> unassign(byte[] regionName) { 1789 CompletableFuture<Void> future = new CompletableFuture<>(); 1790 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1791 if (err != null) { 1792 future.completeExceptionally(err); 1793 return; 1794 } 1795 addListener( 1796 this.<Void> newMasterCaller().tableName(regionInfo.getTable()) 1797 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1798 Void> call(controller, stub, 1799 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1800 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1801 .call(), 1802 (ret, err2) -> { 1803 if (err2 != null) { 1804 future.completeExceptionally(err2); 1805 } else { 1806 future.complete(ret); 1807 } 1808 }); 1809 }); 1810 return future; 1811 } 1812 1813 @Override 1814 public CompletableFuture<Void> offline(byte[] regionName) { 1815 CompletableFuture<Void> future = new CompletableFuture<>(); 1816 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1817 if (err != null) { 1818 future.completeExceptionally(err); 1819 return; 1820 } 1821 addListener(this.<Void> newMasterCaller().tableName(regionInfo.getTable()) 1822 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1823 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1824 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1825 .call(), (ret, err2) -> { 1826 if (err2 != null) { 1827 future.completeExceptionally(err2); 1828 } else { 1829 future.complete(ret); 1830 } 1831 }); 1832 }); 1833 return future; 1834 } 1835 1836 @Override 1837 public CompletableFuture<Void> move(byte[] regionName) { 1838 CompletableFuture<Void> future = new CompletableFuture<>(); 1839 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1840 if (err != null) { 1841 future.completeExceptionally(err); 1842 return; 1843 } 1844 addListener( 1845 moveRegion(regionInfo, 1846 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1847 (ret, err2) -> { 1848 if (err2 != null) { 1849 future.completeExceptionally(err2); 1850 } else { 1851 future.complete(ret); 1852 } 1853 }); 1854 }); 1855 return future; 1856 } 1857 1858 @Override 1859 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1860 Preconditions.checkNotNull(destServerName, 1861 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1862 CompletableFuture<Void> future = new CompletableFuture<>(); 1863 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1864 if (err != null) { 1865 future.completeExceptionally(err); 1866 return; 1867 } 1868 addListener( 1869 moveRegion(regionInfo, RequestConverter 1870 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1871 (ret, err2) -> { 1872 if (err2 != null) { 1873 future.completeExceptionally(err2); 1874 } else { 1875 future.complete(ret); 1876 } 1877 }); 1878 }); 1879 return future; 1880 } 1881 1882 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1883 return this.<Void> newMasterCaller().tableName(regionInfo.getTable()) 1884 .action( 1885 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1886 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1887 .call(); 1888 } 1889 1890 @Override 1891 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1892 return this.<Void> newMasterCaller() 1893 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1894 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1895 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1896 .call(); 1897 } 1898 1899 @Override 1900 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1901 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1902 Scan scan = QuotaTableUtil.makeScan(filter); 1903 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1904 new AdvancedScanResultConsumer() { 1905 List<QuotaSettings> settings = new ArrayList<>(); 1906 1907 @Override 1908 public void onNext(Result[] results, ScanController controller) { 1909 for (Result result : results) { 1910 try { 1911 QuotaTableUtil.parseResultToCollection(result, settings); 1912 } catch (IOException e) { 1913 controller.terminate(); 1914 future.completeExceptionally(e); 1915 } 1916 } 1917 } 1918 1919 @Override 1920 public void onError(Throwable error) { 1921 future.completeExceptionally(error); 1922 } 1923 1924 @Override 1925 public void onComplete() { 1926 future.complete(settings); 1927 } 1928 }); 1929 return future; 1930 } 1931 1932 @Override 1933 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1934 boolean enabled) { 1935 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1936 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1937 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1938 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1939 } 1940 1941 @Override 1942 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1943 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1944 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1945 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1946 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1947 } 1948 1949 @Override 1950 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1951 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1952 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1953 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1954 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1955 } 1956 1957 @Override 1958 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1959 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1960 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1961 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1962 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1963 } 1964 1965 @Override 1966 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1967 return this.<ReplicationPeerConfig> newMasterCaller() 1968 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1969 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1970 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1971 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1972 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1973 .call(); 1974 } 1975 1976 @Override 1977 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1978 ReplicationPeerConfig peerConfig) { 1979 return this.<UpdateReplicationPeerConfigRequest, 1980 UpdateReplicationPeerConfigResponse> procedureCall( 1981 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1982 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1983 (resp) -> resp.getProcId(), 1984 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1985 } 1986 1987 @Override 1988 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1989 SyncReplicationState clusterState) { 1990 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1991 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1992 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1993 clusterState), 1994 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1995 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1996 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1997 } 1998 1999 @Override 2000 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 2001 Map<TableName, List<String>> tableCfs) { 2002 if (tableCfs == null) { 2003 return failedFuture(new ReplicationException("tableCfs is null")); 2004 } 2005 2006 CompletableFuture<Void> future = new CompletableFuture<>(); 2007 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 2008 if (!completeExceptionally(future, error)) { 2009 ReplicationPeerConfig newPeerConfig = 2010 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 2011 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 2012 if (!completeExceptionally(future, error)) { 2013 future.complete(result); 2014 } 2015 }); 2016 } 2017 }); 2018 return future; 2019 } 2020 2021 @Override 2022 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 2023 Map<TableName, List<String>> tableCfs) { 2024 if (tableCfs == null) { 2025 return failedFuture(new ReplicationException("tableCfs is null")); 2026 } 2027 2028 CompletableFuture<Void> future = new CompletableFuture<>(); 2029 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 2030 if (!completeExceptionally(future, error)) { 2031 ReplicationPeerConfig newPeerConfig = null; 2032 try { 2033 newPeerConfig = ReplicationPeerConfigUtil 2034 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 2035 } catch (ReplicationException e) { 2036 future.completeExceptionally(e); 2037 return; 2038 } 2039 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 2040 if (!completeExceptionally(future, error)) { 2041 future.complete(result); 2042 } 2043 }); 2044 } 2045 }); 2046 return future; 2047 } 2048 2049 @Override 2050 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 2051 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 2052 } 2053 2054 @Override 2055 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 2056 Preconditions.checkNotNull(pattern, 2057 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 2058 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 2059 } 2060 2061 private CompletableFuture<List<ReplicationPeerDescription>> 2062 listReplicationPeers(ListReplicationPeersRequest request) { 2063 return this.<List<ReplicationPeerDescription>> newMasterCaller() 2064 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 2065 List<ReplicationPeerDescription>> call(controller, stub, request, 2066 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 2067 (resp) -> resp.getPeerDescList().stream() 2068 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 2069 .collect(Collectors.toList()))) 2070 .call(); 2071 } 2072 2073 @Override 2074 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 2075 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 2076 addListener(listTableDescriptors(), (tables, error) -> { 2077 if (!completeExceptionally(future, error)) { 2078 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 2079 tables.forEach(table -> { 2080 Map<String, Integer> cfs = new HashMap<>(); 2081 Stream.of(table.getColumnFamilies()) 2082 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 2083 .forEach(column -> { 2084 cfs.put(column.getNameAsString(), column.getScope()); 2085 }); 2086 if (!cfs.isEmpty()) { 2087 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 2088 } 2089 }); 2090 future.complete(replicatedTableCFs); 2091 } 2092 }); 2093 return future; 2094 } 2095 2096 @Override 2097 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 2098 SnapshotProtos.SnapshotDescription snapshot = 2099 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2100 try { 2101 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2102 } catch (IllegalArgumentException e) { 2103 return failedFuture(e); 2104 } 2105 CompletableFuture<Void> future = new CompletableFuture<>(); 2106 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2107 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2108 addListener(this.<SnapshotResponse> newMasterCaller() 2109 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2110 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2111 .call(), (resp, err) -> { 2112 if (err != null) { 2113 future.completeExceptionally(err); 2114 return; 2115 } 2116 waitSnapshotFinish(snapshotDesc, future, resp); 2117 }); 2118 return future; 2119 } 2120 2121 // This is for keeping compatibility with old implementation. 2122 // If there is a procId field in the response, then the snapshot will be operated with a 2123 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2124 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2125 SnapshotResponse resp) { 2126 if (resp.hasProcId()) { 2127 getProcedureResult(resp.getProcId(), src -> null, future, 0); 2128 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2129 } else { 2130 long expectedTimeout = resp.getExpectedTimeout(); 2131 TimerTask pollingTask = new TimerTask() { 2132 int tries = 0; 2133 long startTime = EnvironmentEdgeManager.currentTime(); 2134 long endTime = startTime + expectedTimeout; 2135 long maxPauseTime = expectedTimeout / maxAttempts; 2136 2137 @Override 2138 public void run(Timeout timeout) throws Exception { 2139 if (EnvironmentEdgeManager.currentTime() < endTime) { 2140 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2141 if (err2 != null) { 2142 future.completeExceptionally(err2); 2143 } else if (done) { 2144 future.complete(null); 2145 } else { 2146 // retry again after pauseTime. 2147 long pauseTime = 2148 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2149 pauseTime = Math.min(pauseTime, maxPauseTime); 2150 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2151 } 2152 }); 2153 } else { 2154 future 2155 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2156 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2157 } 2158 } 2159 }; 2160 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2161 } 2162 } 2163 2164 @Override 2165 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2166 return this.<Boolean> newMasterCaller() 2167 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2168 Boolean> call(controller, stub, 2169 IsSnapshotDoneRequest.newBuilder() 2170 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2171 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2172 .call(); 2173 } 2174 2175 @Override 2176 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2177 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2178 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2179 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2180 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2181 } 2182 2183 @Override 2184 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2185 boolean restoreAcl) { 2186 CompletableFuture<Void> future = new CompletableFuture<>(); 2187 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2188 if (err != null) { 2189 future.completeExceptionally(err); 2190 return; 2191 } 2192 TableName tableName = null; 2193 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2194 for (SnapshotDescription snap : snapshotDescriptions) { 2195 if (snap.getName().equals(snapshotName)) { 2196 tableName = snap.getTableName(); 2197 break; 2198 } 2199 } 2200 } 2201 if (tableName == null) { 2202 future.completeExceptionally( 2203 new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist.")); 2204 return; 2205 } 2206 final TableName finalTableName = tableName; 2207 addListener(tableExists(finalTableName), (exists, err2) -> { 2208 if (err2 != null) { 2209 future.completeExceptionally(err2); 2210 } else if (!exists) { 2211 // if table does not exist, then just clone snapshot into new table. 2212 completeConditionalOnFuture(future, 2213 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2214 } else { 2215 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2216 if (err4 != null) { 2217 future.completeExceptionally(err4); 2218 } else if (!disabled) { 2219 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2220 } else { 2221 completeConditionalOnFuture(future, 2222 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2223 } 2224 }); 2225 } 2226 }); 2227 }); 2228 return future; 2229 } 2230 2231 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2232 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2233 if (takeFailSafeSnapshot) { 2234 CompletableFuture<Void> future = new CompletableFuture<>(); 2235 // Step.1 Take a snapshot of the current state 2236 String failSafeSnapshotSnapshotNameFormat = 2237 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2238 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2239 final String failSafeSnapshotSnapshotName = 2240 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2241 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2242 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2243 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2244 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2245 if (err != null) { 2246 future.completeExceptionally(err); 2247 } else { 2248 // Step.2 Restore snapshot 2249 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2250 (void2, err2) -> { 2251 if (err2 != null) { 2252 // Step.3.a Something went wrong during the restore and try to rollback. 2253 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2254 restoreAcl, null), (void3, err3) -> { 2255 if (err3 != null) { 2256 future.completeExceptionally(err3); 2257 } else { 2258 // If fail to restore snapshot but rollback successfully, delete the 2259 // restore-failsafe snapshot. 2260 LOG.info( 2261 "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2262 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> { 2263 if (err4 != null) { 2264 LOG.error("Unable to remove the failsafe snapshot: {}", 2265 failSafeSnapshotSnapshotName, err4); 2266 } 2267 String msg = 2268 "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot=" 2269 + failSafeSnapshotSnapshotName + " succeeded."; 2270 LOG.error(msg); 2271 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2272 }); 2273 } 2274 }); 2275 } else { 2276 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2277 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2278 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2279 if (err3 != null) { 2280 LOG.error( 2281 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2282 err3); 2283 } 2284 future.complete(ret3); 2285 }); 2286 } 2287 }); 2288 } 2289 }); 2290 return future; 2291 } else { 2292 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2293 } 2294 } 2295 2296 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2297 CompletableFuture<T> parentFuture) { 2298 addListener(parentFuture, (res, err) -> { 2299 if (err != null) { 2300 dependentFuture.completeExceptionally(err); 2301 } else { 2302 dependentFuture.complete(res); 2303 } 2304 }); 2305 } 2306 2307 @Override 2308 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2309 boolean restoreAcl, String customSFT) { 2310 CompletableFuture<Void> future = new CompletableFuture<>(); 2311 addListener(tableExists(tableName), (exists, err) -> { 2312 if (err != null) { 2313 future.completeExceptionally(err); 2314 } else if (exists) { 2315 future.completeExceptionally(new TableExistsException(tableName)); 2316 } else { 2317 completeConditionalOnFuture(future, 2318 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2319 } 2320 }); 2321 return future; 2322 } 2323 2324 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2325 boolean restoreAcl, String customSFT) { 2326 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2327 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2328 try { 2329 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2330 } catch (IllegalArgumentException e) { 2331 return failedFuture(e); 2332 } 2333 RestoreSnapshotRequest.Builder builder = 2334 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2335 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2336 if (customSFT != null) { 2337 builder.setCustomSFT(customSFT); 2338 } 2339 return waitProcedureResult(this.<Long> newMasterCaller() 2340 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2341 Long> call(controller, stub, builder.build(), 2342 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2343 .call(), result -> null); 2344 } 2345 2346 @Override 2347 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2348 return getCompletedSnapshots(null); 2349 } 2350 2351 @Override 2352 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2353 Preconditions.checkNotNull(pattern, 2354 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2355 return getCompletedSnapshots(pattern); 2356 } 2357 2358 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2359 return this.<List<SnapshotDescription>> newMasterCaller() 2360 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2361 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2362 GetCompletedSnapshotsRequest.newBuilder().build(), 2363 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2364 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2365 .call(); 2366 } 2367 2368 @Override 2369 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2370 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2371 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2372 return getCompletedSnapshots(tableNamePattern, null); 2373 } 2374 2375 @Override 2376 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2377 Pattern snapshotNamePattern) { 2378 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2379 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2380 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2381 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2382 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2383 } 2384 2385 private CompletableFuture<List<SnapshotDescription>> 2386 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2387 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2388 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2389 if (err != null) { 2390 future.completeExceptionally(err); 2391 return; 2392 } 2393 if (tableNames == null || tableNames.size() <= 0) { 2394 future.complete(Collections.emptyList()); 2395 return; 2396 } 2397 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2398 if (err2 != null) { 2399 future.completeExceptionally(err2); 2400 return; 2401 } 2402 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2403 future.complete(Collections.emptyList()); 2404 return; 2405 } 2406 future.complete(snapshotDescList.stream() 2407 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2408 .collect(Collectors.toList())); 2409 }); 2410 }); 2411 return future; 2412 } 2413 2414 @Override 2415 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2416 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2417 } 2418 2419 @Override 2420 public CompletableFuture<Void> deleteSnapshots() { 2421 return internalDeleteSnapshots(null, null); 2422 } 2423 2424 @Override 2425 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2426 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2427 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2428 return internalDeleteSnapshots(null, snapshotNamePattern); 2429 } 2430 2431 @Override 2432 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2433 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2434 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2435 return internalDeleteSnapshots(tableNamePattern, null); 2436 } 2437 2438 @Override 2439 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2440 Pattern snapshotNamePattern) { 2441 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2442 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2443 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2444 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2445 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2446 } 2447 2448 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2449 Pattern snapshotNamePattern) { 2450 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2451 if (tableNamePattern == null) { 2452 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2453 } else { 2454 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2455 } 2456 CompletableFuture<Void> future = new CompletableFuture<>(); 2457 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2458 if (err != null) { 2459 future.completeExceptionally(err); 2460 return; 2461 } 2462 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2463 future.complete(null); 2464 return; 2465 } 2466 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2467 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2468 if (e != null) { 2469 future.completeExceptionally(e); 2470 } else { 2471 future.complete(v); 2472 } 2473 }); 2474 }); 2475 return future; 2476 } 2477 2478 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2479 return this.<Void> newMasterCaller() 2480 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2481 controller, stub, 2482 DeleteSnapshotRequest.newBuilder() 2483 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2484 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2485 .call(); 2486 } 2487 2488 @Override 2489 public CompletableFuture<Void> execProcedure(String signature, String instance, 2490 Map<String, String> props) { 2491 CompletableFuture<Void> future = new CompletableFuture<>(); 2492 ProcedureDescription procDesc = 2493 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2494 addListener(this.<Long> newMasterCaller() 2495 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2496 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2497 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2498 .call(), (expectedTimeout, err) -> { 2499 if (err != null) { 2500 future.completeExceptionally(err); 2501 return; 2502 } 2503 TimerTask pollingTask = new TimerTask() { 2504 int tries = 0; 2505 long startTime = EnvironmentEdgeManager.currentTime(); 2506 long endTime = startTime + expectedTimeout; 2507 long maxPauseTime = expectedTimeout / maxAttempts; 2508 2509 @Override 2510 public void run(Timeout timeout) throws Exception { 2511 if (EnvironmentEdgeManager.currentTime() < endTime) { 2512 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2513 if (err2 != null) { 2514 future.completeExceptionally(err2); 2515 return; 2516 } 2517 if (done) { 2518 future.complete(null); 2519 } else { 2520 // retry again after pauseTime. 2521 long pauseTime = 2522 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2523 pauseTime = Math.min(pauseTime, maxPauseTime); 2524 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2525 TimeUnit.MICROSECONDS); 2526 } 2527 }); 2528 } else { 2529 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2530 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2531 } 2532 } 2533 }; 2534 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2535 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2536 }); 2537 return future; 2538 } 2539 2540 @Override 2541 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2542 Map<String, String> props) { 2543 ProcedureDescription proDesc = 2544 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2545 return this.<byte[]> newMasterCaller() 2546 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2547 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2548 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2549 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2550 .call(); 2551 } 2552 2553 @Override 2554 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2555 Map<String, String> props) { 2556 ProcedureDescription proDesc = 2557 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2558 return this.<Boolean> newMasterCaller() 2559 .action( 2560 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2561 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2562 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2563 .call(); 2564 } 2565 2566 @Override 2567 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2568 return this.<Boolean> newMasterCaller().action( 2569 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2570 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2571 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2572 .call(); 2573 } 2574 2575 @Override 2576 public CompletableFuture<String> getProcedures() { 2577 return this.<String> newMasterCaller() 2578 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2579 controller, stub, GetProceduresRequest.newBuilder().build(), 2580 (s, c, req, done) -> s.getProcedures(c, req, done), 2581 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2582 .call(); 2583 } 2584 2585 @Override 2586 public CompletableFuture<String> getLocks() { 2587 return this.<String> newMasterCaller() 2588 .action( 2589 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2590 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2591 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2592 .call(); 2593 } 2594 2595 @Override 2596 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2597 boolean offload) { 2598 return this.<Void> newMasterCaller() 2599 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2600 DecommissionRegionServersResponse, Void> call(controller, stub, 2601 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2602 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2603 .call(); 2604 } 2605 2606 @Override 2607 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2608 return this.<List<ServerName>> newMasterCaller() 2609 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2610 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2611 ListDecommissionedRegionServersRequest.newBuilder().build(), 2612 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2613 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2614 .collect(Collectors.toList()))) 2615 .call(); 2616 } 2617 2618 @Override 2619 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2620 List<byte[]> encodedRegionNames) { 2621 return this.<Void> newMasterCaller() 2622 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2623 RecommissionRegionServerResponse, Void> call(controller, stub, 2624 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2625 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2626 .call(); 2627 } 2628 2629 /** 2630 * Get the region location for the passed region name. The region name may be a full region name 2631 * or encoded region name. If the region does not found, then it'll throw an 2632 * UnknownRegionException wrapped by a {@link CompletableFuture} 2633 * @param regionNameOrEncodedRegionName region name or encoded region name 2634 * @return region location, wrapped by a {@link CompletableFuture} 2635 */ 2636 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2637 if (regionNameOrEncodedRegionName == null) { 2638 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2639 } 2640 2641 CompletableFuture<Optional<HRegionLocation>> future; 2642 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2643 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2644 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2645 // old format encodedName, should be meta region 2646 future = connection.registry.getMetaRegionLocations() 2647 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2648 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2649 } else { 2650 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2651 regionNameOrEncodedRegionName); 2652 } 2653 } else { 2654 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2655 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2656 RegionInfo regionInfo; 2657 try { 2658 regionInfo = 2659 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2660 } catch (IOException ioe) { 2661 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2662 } 2663 2664 if (regionInfo.isMetaRegion()) { 2665 future = connection.registry.getMetaRegionLocations() 2666 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2667 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2668 .findFirst()); 2669 } else { 2670 future = 2671 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2672 } 2673 } 2674 2675 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2676 addListener(future, (location, err) -> { 2677 if (err != null) { 2678 returnedFuture.completeExceptionally(err); 2679 return; 2680 } 2681 if (!location.isPresent() || location.get().getRegion() == null) { 2682 returnedFuture.completeExceptionally( 2683 new UnknownRegionException("Invalid region name or encoded region name: " 2684 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2685 } else { 2686 returnedFuture.complete(location.get()); 2687 } 2688 }); 2689 return returnedFuture; 2690 } 2691 2692 /** 2693 * Get the region info for the passed region name. The region name may be a full region name or 2694 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2695 * wrapped by a {@link CompletableFuture} 2696 * @return region info, wrapped by a {@link CompletableFuture} 2697 */ 2698 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2699 if (regionNameOrEncodedRegionName == null) { 2700 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2701 } 2702 2703 if ( 2704 Bytes.equals(regionNameOrEncodedRegionName, 2705 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2706 || Bytes.equals(regionNameOrEncodedRegionName, 2707 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2708 ) { 2709 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2710 } 2711 2712 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2713 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2714 if (err != null) { 2715 future.completeExceptionally(err); 2716 } else { 2717 future.complete(location.getRegion()); 2718 } 2719 }); 2720 return future; 2721 } 2722 2723 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2724 if (numRegions < 3) { 2725 throw new IllegalArgumentException("Must create at least three regions"); 2726 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2727 throw new IllegalArgumentException("Start key must be smaller than end key"); 2728 } 2729 if (numRegions == 3) { 2730 return new byte[][] { startKey, endKey }; 2731 } 2732 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2733 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2734 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2735 } 2736 return splitKeys; 2737 } 2738 2739 private void verifySplitKeys(byte[][] splitKeys) { 2740 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2741 // Verify there are no duplicate split keys 2742 byte[] lastKey = null; 2743 for (byte[] splitKey : splitKeys) { 2744 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2745 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2746 } 2747 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2748 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2749 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2750 } 2751 lastKey = splitKey; 2752 } 2753 } 2754 2755 private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> { 2756 2757 abstract void onFinished(); 2758 2759 abstract void onError(Throwable error); 2760 2761 @Override 2762 public void accept(T value, Throwable error) { 2763 if (error != null) { 2764 onError(error); 2765 return; 2766 } 2767 onFinished(); 2768 } 2769 } 2770 2771 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> { 2772 protected final TableName tableName; 2773 2774 TableProcedureBiConsumer(TableName tableName) { 2775 this.tableName = tableName; 2776 } 2777 2778 abstract String getOperationType(); 2779 2780 String getDescription() { 2781 return "Operation: " + getOperationType() + ", " + "Table Name: " 2782 + tableName.getNameWithNamespaceInclAsString(); 2783 } 2784 2785 @Override 2786 void onFinished() { 2787 LOG.info(getDescription() + " completed"); 2788 } 2789 2790 @Override 2791 void onError(Throwable error) { 2792 LOG.info(getDescription() + " failed with " + error.getMessage()); 2793 } 2794 } 2795 2796 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> { 2797 protected final String namespaceName; 2798 2799 NamespaceProcedureBiConsumer(String namespaceName) { 2800 this.namespaceName = namespaceName; 2801 } 2802 2803 abstract String getOperationType(); 2804 2805 String getDescription() { 2806 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2807 } 2808 2809 @Override 2810 void onFinished() { 2811 LOG.info("{} completed", getDescription()); 2812 } 2813 2814 @Override 2815 void onError(Throwable error) { 2816 LOG.info("{} failed with {}", getDescription(), error.getMessage()); 2817 } 2818 } 2819 2820 private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer { 2821 2822 @Override 2823 void onFinished() { 2824 LOG.info("RestoreBackupSystemTableProcedure completed"); 2825 } 2826 2827 @Override 2828 void onError(Throwable error) { 2829 LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage()); 2830 } 2831 } 2832 2833 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2834 2835 CreateTableProcedureBiConsumer(TableName tableName) { 2836 super(tableName); 2837 } 2838 2839 @Override 2840 String getOperationType() { 2841 return "CREATE"; 2842 } 2843 } 2844 2845 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2846 2847 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2848 super(tableName); 2849 } 2850 2851 @Override 2852 String getOperationType() { 2853 return "MODIFY"; 2854 } 2855 } 2856 2857 private static class ReopenTableRegionsProcedureBiConsumer extends TableProcedureBiConsumer { 2858 2859 ReopenTableRegionsProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2860 super(tableName); 2861 } 2862 2863 @Override 2864 String getOperationType() { 2865 return "REOPEN_TABLE_REGIONS"; 2866 } 2867 } 2868 2869 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2870 extends TableProcedureBiConsumer { 2871 2872 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2873 super(tableName); 2874 } 2875 2876 @Override 2877 String getOperationType() { 2878 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2879 } 2880 } 2881 2882 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2883 2884 DeleteTableProcedureBiConsumer(TableName tableName) { 2885 super(tableName); 2886 } 2887 2888 @Override 2889 String getOperationType() { 2890 return "DELETE"; 2891 } 2892 2893 @Override 2894 void onFinished() { 2895 connection.getLocator().clearCache(this.tableName); 2896 super.onFinished(); 2897 } 2898 } 2899 2900 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2901 2902 TruncateTableProcedureBiConsumer(TableName tableName) { 2903 super(tableName); 2904 } 2905 2906 @Override 2907 String getOperationType() { 2908 return "TRUNCATE"; 2909 } 2910 } 2911 2912 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2913 2914 EnableTableProcedureBiConsumer(TableName tableName) { 2915 super(tableName); 2916 } 2917 2918 @Override 2919 String getOperationType() { 2920 return "ENABLE"; 2921 } 2922 } 2923 2924 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2925 2926 DisableTableProcedureBiConsumer(TableName tableName) { 2927 super(tableName); 2928 } 2929 2930 @Override 2931 String getOperationType() { 2932 return "DISABLE"; 2933 } 2934 } 2935 2936 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2937 2938 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2939 super(tableName); 2940 } 2941 2942 @Override 2943 String getOperationType() { 2944 return "ADD_COLUMN_FAMILY"; 2945 } 2946 } 2947 2948 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2949 2950 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2951 super(tableName); 2952 } 2953 2954 @Override 2955 String getOperationType() { 2956 return "DELETE_COLUMN_FAMILY"; 2957 } 2958 } 2959 2960 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2961 2962 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2963 super(tableName); 2964 } 2965 2966 @Override 2967 String getOperationType() { 2968 return "MODIFY_COLUMN_FAMILY"; 2969 } 2970 } 2971 2972 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2973 extends TableProcedureBiConsumer { 2974 2975 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2976 super(tableName); 2977 } 2978 2979 @Override 2980 String getOperationType() { 2981 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2982 } 2983 } 2984 2985 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2986 2987 FlushTableProcedureBiConsumer(TableName tableName) { 2988 super(tableName); 2989 } 2990 2991 @Override 2992 String getOperationType() { 2993 return "FLUSH"; 2994 } 2995 } 2996 2997 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2998 2999 CreateNamespaceProcedureBiConsumer(String namespaceName) { 3000 super(namespaceName); 3001 } 3002 3003 @Override 3004 String getOperationType() { 3005 return "CREATE_NAMESPACE"; 3006 } 3007 } 3008 3009 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 3010 3011 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 3012 super(namespaceName); 3013 } 3014 3015 @Override 3016 String getOperationType() { 3017 return "DELETE_NAMESPACE"; 3018 } 3019 } 3020 3021 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 3022 3023 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 3024 super(namespaceName); 3025 } 3026 3027 @Override 3028 String getOperationType() { 3029 return "MODIFY_NAMESPACE"; 3030 } 3031 } 3032 3033 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3034 3035 MergeTableRegionProcedureBiConsumer(TableName tableName) { 3036 super(tableName); 3037 } 3038 3039 @Override 3040 String getOperationType() { 3041 return "MERGE_REGIONS"; 3042 } 3043 } 3044 3045 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3046 3047 SplitTableRegionProcedureBiConsumer(TableName tableName) { 3048 super(tableName); 3049 } 3050 3051 @Override 3052 String getOperationType() { 3053 return "SPLIT_REGION"; 3054 } 3055 } 3056 3057 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3058 3059 TruncateRegionProcedureBiConsumer(TableName tableName) { 3060 super(tableName); 3061 } 3062 3063 @Override 3064 String getOperationType() { 3065 return "TRUNCATE_REGION"; 3066 } 3067 } 3068 3069 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 3070 SnapshotProcedureBiConsumer(TableName tableName) { 3071 super(tableName); 3072 } 3073 3074 @Override 3075 String getOperationType() { 3076 return "SNAPSHOT"; 3077 } 3078 } 3079 3080 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> { 3081 private final String peerId; 3082 private final Supplier<String> getOperation; 3083 3084 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 3085 this.peerId = peerId; 3086 this.getOperation = getOperation; 3087 } 3088 3089 String getDescription() { 3090 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 3091 } 3092 3093 @Override 3094 void onFinished() { 3095 LOG.info("{} completed", getDescription()); 3096 } 3097 3098 @Override 3099 void onError(Throwable error) { 3100 LOG.info("{} failed with {}", getDescription(), error.getMessage()); 3101 } 3102 } 3103 3104 private static final class RollAllWALWritersBiConsumer 3105 extends ProcedureBiConsumer<Map<ServerName, Long>> { 3106 3107 @Override 3108 void onFinished() { 3109 LOG.info("Rolling all WAL writers completed"); 3110 } 3111 3112 @Override 3113 void onError(Throwable error) { 3114 LOG.warn("Rolling all WAL writers failed with {}", error.getMessage()); 3115 } 3116 } 3117 3118 private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture, 3119 Converter<T, ByteString> converter) { 3120 CompletableFuture<T> future = new CompletableFuture<>(); 3121 addListener(procFuture, (procId, error) -> { 3122 if (error != null) { 3123 future.completeExceptionally(error); 3124 return; 3125 } 3126 getProcedureResult(procId, converter, future, 0); 3127 }); 3128 return future; 3129 } 3130 3131 private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter, 3132 CompletableFuture<T> future, int retries) { 3133 addListener( 3134 this.<GetProcedureResultResponse> newMasterCaller() 3135 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 3136 GetProcedureResultResponse> call(controller, stub, 3137 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 3138 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 3139 .call(), 3140 (response, error) -> { 3141 if (error != null) { 3142 Throwable exc = ConnectionUtils.translateException(error); 3143 if (exc instanceof DoNotRetryIOException) { 3144 // stop retrying on DNRIOE 3145 future.completeExceptionally(exc); 3146 } else { 3147 LOG.warn("failed to get the procedure result procId={}", procId, exc); 3148 retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), 3149 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3150 } 3151 return; 3152 } 3153 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 3154 retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), 3155 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3156 return; 3157 } 3158 if (response.hasException()) { 3159 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 3160 future.completeExceptionally(ioe); 3161 } else { 3162 try { 3163 future.complete(converter.convert(response.getResult())); 3164 } catch (IOException e) { 3165 future.completeExceptionally(e); 3166 } 3167 } 3168 }); 3169 } 3170 3171 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3172 CompletableFuture<T> future = new CompletableFuture<>(); 3173 future.completeExceptionally(error); 3174 return future; 3175 } 3176 3177 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3178 if (error != null) { 3179 future.completeExceptionally(error); 3180 return true; 3181 } 3182 return false; 3183 } 3184 3185 @Override 3186 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3187 return getClusterMetrics(EnumSet.allOf(Option.class)); 3188 } 3189 3190 @Override 3191 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3192 return this.<ClusterMetrics> newMasterCaller() 3193 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3194 ClusterMetrics> call(controller, stub, 3195 RequestConverter.buildGetClusterStatusRequest(options), 3196 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3197 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3198 .call(); 3199 } 3200 3201 @Override 3202 public CompletableFuture<Void> shutdown() { 3203 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3204 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3205 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3206 resp -> null)) 3207 .call(); 3208 } 3209 3210 @Override 3211 public CompletableFuture<Void> stopMaster() { 3212 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3213 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3214 controller, stub, StopMasterRequest.newBuilder().build(), 3215 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3216 .call(); 3217 } 3218 3219 @Override 3220 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3221 StopServerRequest request = RequestConverter 3222 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3223 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3224 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3225 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3226 resp -> null)) 3227 .serverName(serverName).call(); 3228 } 3229 3230 @Override 3231 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3232 return this.<Void> newAdminCaller() 3233 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3234 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3235 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3236 .serverName(serverName).call(); 3237 } 3238 3239 @Override 3240 public CompletableFuture<Void> updateConfiguration() { 3241 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3242 addListener( 3243 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3244 (status, err) -> { 3245 if (err != null) { 3246 future.completeExceptionally(err); 3247 } else { 3248 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3249 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3250 futures.add(updateConfiguration(status.getMasterName())); 3251 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3252 addListener( 3253 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3254 (result, err2) -> { 3255 if (err2 != null) { 3256 future.completeExceptionally(err2); 3257 } else { 3258 future.complete(result); 3259 } 3260 }); 3261 } 3262 }); 3263 return future; 3264 } 3265 3266 @Override 3267 public CompletableFuture<Void> updateConfiguration(String groupName) { 3268 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3269 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3270 if (err != null) { 3271 future.completeExceptionally(err); 3272 } else if (rsGroupInfo == null) { 3273 future.completeExceptionally( 3274 new IllegalArgumentException("Group does not exist: " + groupName)); 3275 } else { 3276 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3277 if (err2 != null) { 3278 future.completeExceptionally(err2); 3279 } else { 3280 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3281 List<ServerName> groupServers = status.getServersName().stream() 3282 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3283 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3284 addListener( 3285 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3286 (result, err3) -> { 3287 if (err3 != null) { 3288 future.completeExceptionally(err3); 3289 } else { 3290 future.complete(result); 3291 } 3292 }); 3293 } 3294 }); 3295 } 3296 }); 3297 return future; 3298 } 3299 3300 @Override 3301 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3302 return this.<Void> newAdminCaller() 3303 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3304 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3305 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3306 .serverName(serverName).call(); 3307 } 3308 3309 @Override 3310 public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() { 3311 return this 3312 .<RollAllWALWritersRequest, RollAllWALWritersResponse, 3313 Map<ServerName, 3314 Long>> procedureCall( 3315 RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()), 3316 (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(), 3317 result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap() 3318 .entrySet().stream().collect(Collectors 3319 .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)), 3320 new RollAllWALWritersBiConsumer()); 3321 } 3322 3323 @Override 3324 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3325 return this.<Void> newAdminCaller() 3326 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3327 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3328 RequestConverter.buildClearCompactionQueuesRequest(queues), 3329 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3330 .serverName(serverName).call(); 3331 } 3332 3333 @Override 3334 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3335 return this.<List<SecurityCapability>> newMasterCaller() 3336 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3337 List<SecurityCapability>> call(controller, stub, 3338 SecurityCapabilitiesRequest.newBuilder().build(), 3339 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3340 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3341 .call(); 3342 } 3343 3344 @Override 3345 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3346 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3347 } 3348 3349 @Override 3350 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3351 TableName tableName) { 3352 Preconditions.checkNotNull(tableName, 3353 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3354 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3355 } 3356 3357 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3358 ServerName serverName) { 3359 return this.<List<RegionMetrics>> newAdminCaller() 3360 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3361 List<RegionMetrics>> adminCall(controller, stub, request, 3362 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3363 RegionMetricsBuilder::toRegionMetrics)) 3364 .serverName(serverName).call(); 3365 } 3366 3367 @Override 3368 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3369 return this.<Boolean> newMasterCaller() 3370 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3371 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3372 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3373 resp -> resp.getInMaintenanceMode())) 3374 .call(); 3375 } 3376 3377 @Override 3378 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3379 CompactType compactType) { 3380 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3381 3382 switch (compactType) { 3383 case MOB: 3384 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3385 if (err != null) { 3386 future.completeExceptionally(err); 3387 return; 3388 } 3389 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3390 3391 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3392 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3393 GetRegionInfoResponse> adminCall(controller, stub, 3394 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3395 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3396 .call(), (resp2, err2) -> { 3397 if (err2 != null) { 3398 future.completeExceptionally(err2); 3399 } else { 3400 if (resp2.hasCompactionState()) { 3401 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3402 } else { 3403 future.complete(CompactionState.NONE); 3404 } 3405 } 3406 }); 3407 }); 3408 break; 3409 case NORMAL: 3410 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3411 if (err != null) { 3412 future.completeExceptionally(err); 3413 return; 3414 } 3415 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3416 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3417 locations.stream().filter(loc -> loc.getServerName() != null) 3418 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3419 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3420 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3421 // If any region compaction state is MAJOR_AND_MINOR 3422 // the table compaction state is MAJOR_AND_MINOR, too. 3423 if (err2 != null) { 3424 future.completeExceptionally(unwrapCompletionException(err2)); 3425 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3426 future.complete(regionState); 3427 } else { 3428 regionStates.add(regionState); 3429 } 3430 })); 3431 }); 3432 addListener( 3433 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3434 (ret, err3) -> { 3435 // If future not completed, check all regions's compaction state 3436 if (!future.isCompletedExceptionally() && !future.isDone()) { 3437 CompactionState state = CompactionState.NONE; 3438 for (CompactionState regionState : regionStates) { 3439 switch (regionState) { 3440 case MAJOR: 3441 if (state == CompactionState.MINOR) { 3442 future.complete(CompactionState.MAJOR_AND_MINOR); 3443 } else { 3444 state = CompactionState.MAJOR; 3445 } 3446 break; 3447 case MINOR: 3448 if (state == CompactionState.MAJOR) { 3449 future.complete(CompactionState.MAJOR_AND_MINOR); 3450 } else { 3451 state = CompactionState.MINOR; 3452 } 3453 break; 3454 case NONE: 3455 default: 3456 } 3457 } 3458 if (!future.isDone()) { 3459 future.complete(state); 3460 } 3461 } 3462 }); 3463 }); 3464 break; 3465 default: 3466 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3467 } 3468 3469 return future; 3470 } 3471 3472 @Override 3473 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3474 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3475 addListener(getRegionLocation(regionName), (location, err) -> { 3476 if (err != null) { 3477 future.completeExceptionally(err); 3478 return; 3479 } 3480 ServerName serverName = location.getServerName(); 3481 if (serverName == null) { 3482 future 3483 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3484 return; 3485 } 3486 addListener( 3487 this.<GetRegionInfoResponse> newAdminCaller() 3488 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3489 GetRegionInfoResponse> adminCall(controller, stub, 3490 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3491 true), 3492 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3493 .serverName(serverName).call(), 3494 (resp2, err2) -> { 3495 if (err2 != null) { 3496 future.completeExceptionally(err2); 3497 } else { 3498 if (resp2.hasCompactionState()) { 3499 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3500 } else { 3501 future.complete(CompactionState.NONE); 3502 } 3503 } 3504 }); 3505 }); 3506 return future; 3507 } 3508 3509 @Override 3510 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3511 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3512 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3513 return this.<Optional<Long>> newMasterCaller() 3514 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3515 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3516 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3517 ProtobufUtil::toOptionalTimestamp)) 3518 .call(); 3519 } 3520 3521 @Override 3522 public CompletableFuture<Optional<Long>> 3523 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3524 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3525 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3526 addListener(getRegionInfo(regionName), (region, err) -> { 3527 if (err != null) { 3528 future.completeExceptionally(err); 3529 return; 3530 } 3531 MajorCompactionTimestampForRegionRequest.Builder builder = 3532 MajorCompactionTimestampForRegionRequest.newBuilder(); 3533 builder.setRegion( 3534 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3535 addListener(this.<Optional<Long>> newMasterCaller() 3536 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3537 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3538 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3539 ProtobufUtil::toOptionalTimestamp)) 3540 .call(), (timestamp, err2) -> { 3541 if (err2 != null) { 3542 future.completeExceptionally(err2); 3543 } else { 3544 future.complete(timestamp); 3545 } 3546 }); 3547 }); 3548 return future; 3549 } 3550 3551 @Override 3552 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3553 List<String> serverNamesList) { 3554 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3555 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3556 if (err != null) { 3557 future.completeExceptionally(err); 3558 return; 3559 } 3560 // Accessed by multiple threads. 3561 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3562 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3563 serverNames.stream().forEach(serverName -> { 3564 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3565 if (err2 != null) { 3566 future.completeExceptionally(unwrapCompletionException(err2)); 3567 } else { 3568 serverStates.put(serverName, serverState); 3569 } 3570 })); 3571 }); 3572 addListener( 3573 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3574 (ret, err3) -> { 3575 if (!future.isCompletedExceptionally()) { 3576 if (err3 != null) { 3577 future.completeExceptionally(err3); 3578 } else { 3579 future.complete(serverStates); 3580 } 3581 } 3582 }); 3583 }); 3584 return future; 3585 } 3586 3587 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3588 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3589 if (serverNamesList.isEmpty()) { 3590 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3591 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3592 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3593 if (err != null) { 3594 future.completeExceptionally(err); 3595 } else { 3596 future.complete(clusterMetrics.getServersName()); 3597 } 3598 }); 3599 return future; 3600 } else { 3601 List<ServerName> serverList = new ArrayList<>(); 3602 for (String regionServerName : serverNamesList) { 3603 ServerName serverName = null; 3604 try { 3605 serverName = ServerName.valueOf(regionServerName); 3606 } catch (Exception e) { 3607 future.completeExceptionally( 3608 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3609 } 3610 if (serverName == null) { 3611 future.completeExceptionally( 3612 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3613 } else { 3614 serverList.add(serverName); 3615 } 3616 } 3617 future.complete(serverList); 3618 } 3619 return future; 3620 } 3621 3622 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3623 return this.<Boolean> newAdminCaller().serverName(serverName) 3624 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3625 Boolean> adminCall(controller, stub, 3626 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3627 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3628 .call(); 3629 } 3630 3631 @Override 3632 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3633 return this.<Boolean> newMasterCaller() 3634 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3635 Boolean> call(controller, stub, 3636 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3637 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3638 (resp) -> resp.getPrevBalanceValue())) 3639 .call(); 3640 } 3641 3642 @Override 3643 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3644 return this.<BalanceResponse> newMasterCaller() 3645 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3646 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3647 (s, c, req, done) -> s.balance(c, req, done), 3648 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3649 .call(); 3650 } 3651 3652 @Override 3653 public CompletableFuture<Boolean> isBalancerEnabled() { 3654 return this.<Boolean> newMasterCaller() 3655 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3656 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3657 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3658 .call(); 3659 } 3660 3661 @Override 3662 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3663 return this.<Boolean> newMasterCaller() 3664 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3665 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3666 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3667 (resp) -> resp.getPrevNormalizerValue())) 3668 .call(); 3669 } 3670 3671 @Override 3672 public CompletableFuture<Boolean> isNormalizerEnabled() { 3673 return this.<Boolean> newMasterCaller() 3674 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3675 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3676 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3677 .call(); 3678 } 3679 3680 @Override 3681 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3682 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3683 } 3684 3685 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3686 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3687 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3688 } 3689 3690 @Override 3691 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3692 return this.<Boolean> newMasterCaller() 3693 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3694 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3695 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3696 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3697 (resp) -> resp.getPrevValue())) 3698 .call(); 3699 } 3700 3701 @Override 3702 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3703 return this.<Boolean> newMasterCaller() 3704 .action( 3705 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3706 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3707 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3708 .call(); 3709 } 3710 3711 @Override 3712 public CompletableFuture<Boolean> runCleanerChore() { 3713 return this.<Boolean> newMasterCaller() 3714 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3715 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3716 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3717 (resp) -> resp.getCleanerChoreRan())) 3718 .call(); 3719 } 3720 3721 @Override 3722 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3723 return this.<Boolean> newMasterCaller() 3724 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3725 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3726 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3727 .call(); 3728 } 3729 3730 @Override 3731 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3732 return this.<Boolean> newMasterCaller() 3733 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3734 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3735 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3736 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3737 .call(); 3738 } 3739 3740 @Override 3741 public CompletableFuture<Integer> runCatalogJanitor() { 3742 return this.<Integer> newMasterCaller() 3743 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3744 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3745 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3746 .call(); 3747 } 3748 3749 @Override 3750 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3751 ServiceCaller<S, R> callable) { 3752 MasterCoprocessorRpcChannelImpl channel = 3753 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3754 S stub = stubMaker.apply(channel); 3755 CompletableFuture<R> future = new CompletableFuture<>(); 3756 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3757 callable.call(stub, controller, resp -> { 3758 if (controller.failed()) { 3759 future.completeExceptionally(controller.getFailed()); 3760 } else { 3761 future.complete(resp); 3762 } 3763 }); 3764 return future; 3765 } 3766 3767 @Override 3768 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3769 ServiceCaller<S, R> callable, ServerName serverName) { 3770 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3771 this.<Message> newServerCaller().serverName(serverName)); 3772 S stub = stubMaker.apply(channel); 3773 CompletableFuture<R> future = new CompletableFuture<>(); 3774 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3775 callable.call(stub, controller, resp -> { 3776 if (controller.failed()) { 3777 future.completeExceptionally(controller.getFailed()); 3778 } else { 3779 future.complete(resp); 3780 } 3781 }); 3782 return future; 3783 } 3784 3785 @Override 3786 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3787 return this.<List<ServerName>> newMasterCaller() 3788 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3789 List<ServerName>> call(controller, stub, 3790 RequestConverter.buildClearDeadServersRequest(servers), 3791 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3792 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3793 .call(); 3794 } 3795 3796 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3797 return this.connection.callerFactory.<T> serverRequest() 3798 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3799 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3800 .pause(pauseNs, TimeUnit.NANOSECONDS) 3801 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3802 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3803 } 3804 3805 @Override 3806 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3807 if (tableName == null) { 3808 return failedFuture(new IllegalArgumentException("Table name is null")); 3809 } 3810 CompletableFuture<Void> future = new CompletableFuture<>(); 3811 addListener(tableExists(tableName), (exist, err) -> { 3812 if (err != null) { 3813 future.completeExceptionally(err); 3814 return; 3815 } 3816 if (!exist) { 3817 future.completeExceptionally(new TableNotFoundException( 3818 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3819 return; 3820 } 3821 addListener(getTableSplits(tableName), (splits, err1) -> { 3822 if (err1 != null) { 3823 future.completeExceptionally(err1); 3824 } else { 3825 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3826 if (err2 != null) { 3827 future.completeExceptionally(err2); 3828 } else { 3829 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3830 if (err3 != null) { 3831 future.completeExceptionally(err3); 3832 } else { 3833 future.complete(result3); 3834 } 3835 }); 3836 } 3837 }); 3838 } 3839 }); 3840 }); 3841 return future; 3842 } 3843 3844 @Override 3845 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3846 if (tableName == null) { 3847 return failedFuture(new IllegalArgumentException("Table name is null")); 3848 } 3849 CompletableFuture<Void> future = new CompletableFuture<>(); 3850 addListener(tableExists(tableName), (exist, err) -> { 3851 if (err != null) { 3852 future.completeExceptionally(err); 3853 return; 3854 } 3855 if (!exist) { 3856 future.completeExceptionally(new TableNotFoundException( 3857 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3858 return; 3859 } 3860 addListener(setTableReplication(tableName, false), (result, err2) -> { 3861 if (err2 != null) { 3862 future.completeExceptionally(err2); 3863 } else { 3864 future.complete(result); 3865 } 3866 }); 3867 }); 3868 return future; 3869 } 3870 3871 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3872 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3873 addListener( 3874 getRegions(tableName).thenApply(regions -> regions.stream() 3875 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3876 (regions, err2) -> { 3877 if (err2 != null) { 3878 future.completeExceptionally(err2); 3879 return; 3880 } 3881 if (regions.size() == 1) { 3882 future.complete(null); 3883 } else { 3884 byte[][] splits = new byte[regions.size() - 1][]; 3885 for (int i = 1; i < regions.size(); i++) { 3886 splits[i - 1] = regions.get(i).getStartKey(); 3887 } 3888 future.complete(splits); 3889 } 3890 }); 3891 return future; 3892 } 3893 3894 /** 3895 * Connect to peer and check the table descriptor on peer: 3896 * <ol> 3897 * <li>Create the same table on peer when not exist.</li> 3898 * <li>Throw an exception if the table already has replication enabled on any of the column 3899 * families.</li> 3900 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3901 * </ol> 3902 * @param tableName name of the table to sync to the peer 3903 * @param splits table split keys 3904 */ 3905 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3906 byte[][] splits) { 3907 CompletableFuture<Void> future = new CompletableFuture<>(); 3908 addListener(listReplicationPeers(), (peers, err) -> { 3909 if (err != null) { 3910 future.completeExceptionally(err); 3911 return; 3912 } 3913 if (peers == null || peers.size() <= 0) { 3914 future.completeExceptionally( 3915 new IllegalArgumentException("Found no peer cluster for replication.")); 3916 return; 3917 } 3918 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3919 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3920 .forEach(peer -> { 3921 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3922 }); 3923 addListener( 3924 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3925 (result, err2) -> { 3926 if (err2 != null) { 3927 future.completeExceptionally(err2); 3928 } else { 3929 future.complete(result); 3930 } 3931 }); 3932 }); 3933 return future; 3934 } 3935 3936 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3937 ReplicationPeerDescription peer) { 3938 Configuration peerConf; 3939 try { 3940 peerConf = ReplicationPeerConfigUtil 3941 .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig()); 3942 } catch (IOException e) { 3943 return failedFuture(e); 3944 } 3945 URI connectionUri = 3946 ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey()); 3947 CompletableFuture<Void> future = new CompletableFuture<>(); 3948 addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> { 3949 if (err != null) { 3950 future.completeExceptionally(err); 3951 return; 3952 } 3953 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3954 if (err1 != null) { 3955 future.completeExceptionally(err1); 3956 return; 3957 } 3958 AsyncAdmin peerAdmin = conn.getAdmin(); 3959 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3960 if (err2 != null) { 3961 future.completeExceptionally(err2); 3962 return; 3963 } 3964 if (!exist) { 3965 CompletableFuture<Void> createTableFuture = null; 3966 if (splits == null) { 3967 createTableFuture = peerAdmin.createTable(tableDesc); 3968 } else { 3969 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3970 } 3971 addListener(createTableFuture, (result, err3) -> { 3972 if (err3 != null) { 3973 future.completeExceptionally(err3); 3974 } else { 3975 future.complete(result); 3976 } 3977 }); 3978 } else { 3979 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3980 (result, err4) -> { 3981 if (err4 != null) { 3982 future.completeExceptionally(err4); 3983 } else { 3984 future.complete(result); 3985 } 3986 }); 3987 } 3988 }); 3989 }); 3990 }); 3991 return future; 3992 } 3993 3994 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3995 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3996 CompletableFuture<Void> future = new CompletableFuture<>(); 3997 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3998 if (err != null) { 3999 future.completeExceptionally(err); 4000 return; 4001 } 4002 if (peerTableDesc == null) { 4003 future.completeExceptionally( 4004 new IllegalArgumentException("Failed to get table descriptor for table " 4005 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 4006 return; 4007 } 4008 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 4009 future.completeExceptionally(new IllegalArgumentException( 4010 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 4011 + ", but the table descriptors are not same when compared with source cluster." 4012 + " Thus can not enable the table's replication switch.")); 4013 return; 4014 } 4015 future.complete(null); 4016 }); 4017 return future; 4018 } 4019 4020 /** 4021 * Set the table's replication switch if the table's replication switch is already not set. 4022 * @param tableName name of the table 4023 * @param enableRep is replication switch enable or disable 4024 */ 4025 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 4026 CompletableFuture<Void> future = new CompletableFuture<>(); 4027 addListener(getDescriptor(tableName), (tableDesc, err) -> { 4028 if (err != null) { 4029 future.completeExceptionally(err); 4030 return; 4031 } 4032 if (!tableDesc.matchReplicationScope(enableRep)) { 4033 int scope = 4034 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 4035 TableDescriptor newTableDesc = 4036 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 4037 addListener(modifyTable(newTableDesc), (result, err2) -> { 4038 if (err2 != null) { 4039 future.completeExceptionally(err2); 4040 } else { 4041 future.complete(result); 4042 } 4043 }); 4044 } else { 4045 future.complete(null); 4046 } 4047 }); 4048 return future; 4049 } 4050 4051 @Override 4052 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 4053 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 4054 request.setPeerId(peerId); 4055 return this.<Boolean> newMasterCaller() 4056 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 4057 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 4058 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 4059 resp -> resp.getIsEnabled())) 4060 .call(); 4061 } 4062 4063 private void waitUntilAllReplicationPeerModificationProceduresDone( 4064 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 4065 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 4066 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 4067 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 4068 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 4069 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 4070 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 4071 resp -> resp.getProcedureList())) 4072 .call(); 4073 addListener(callFuture, (r, e) -> { 4074 if (e != null) { 4075 future.completeExceptionally(e); 4076 } else if (r.isEmpty()) { 4077 // we are done 4078 future.complete(prevOn); 4079 } else { 4080 // retry later to see if the procedures are done 4081 retryTimer.newTimeout( 4082 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 4083 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 4084 } 4085 }); 4086 } 4087 4088 @Override 4089 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 4090 boolean drainProcedures) { 4091 ReplicationPeerModificationSwitchRequest request = 4092 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 4093 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 4094 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 4095 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 4096 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 4097 resp -> resp.getPreviousValue())) 4098 .call(); 4099 // if we do not need to wait all previous peer modification procedure done, or we are enabling 4100 // peer modification, just return here. 4101 if (!drainProcedures || on) { 4102 return callFuture; 4103 } 4104 // otherwise we need to wait until all previous peer modification procedure done 4105 CompletableFuture<Boolean> future = new CompletableFuture<>(); 4106 addListener(callFuture, (prevOn, err) -> { 4107 if (err != null) { 4108 future.completeExceptionally(err); 4109 return; 4110 } 4111 // even if the previous state is disabled, we still need to wait here, as there could be 4112 // another client thread which called this method just before us and have already changed the 4113 // state to off, but there are still peer modification procedures not finished, so we should 4114 // also wait here. 4115 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 4116 }); 4117 return future; 4118 } 4119 4120 @Override 4121 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 4122 return this.<Boolean> newMasterCaller() 4123 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 4124 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 4125 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 4126 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 4127 (resp) -> resp.getEnabled())) 4128 .call(); 4129 } 4130 4131 @Override 4132 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 4133 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 4134 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 4135 if (err != null) { 4136 future.completeExceptionally(err); 4137 return; 4138 } 4139 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 4140 locations.stream().filter(l -> l.getRegion() != null) 4141 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 4142 .collect(Collectors.groupingBy(l -> l.getServerName(), 4143 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 4144 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 4145 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 4146 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 4147 futures 4148 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 4149 if (err2 != null) { 4150 future.completeExceptionally(unwrapCompletionException(err2)); 4151 } else { 4152 aggregator.append(stats); 4153 } 4154 })); 4155 } 4156 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 4157 (ret, err3) -> { 4158 if (err3 != null) { 4159 future.completeExceptionally(unwrapCompletionException(err3)); 4160 } else { 4161 future.complete(aggregator.sum()); 4162 } 4163 }); 4164 }); 4165 return future; 4166 } 4167 4168 @Override 4169 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 4170 boolean preserveSplits) { 4171 CompletableFuture<Void> future = new CompletableFuture<>(); 4172 addListener(tableExists(tableName), (exist, err) -> { 4173 if (err != null) { 4174 future.completeExceptionally(err); 4175 return; 4176 } 4177 if (!exist) { 4178 future.completeExceptionally(new TableNotFoundException(tableName)); 4179 return; 4180 } 4181 addListener(tableExists(newTableName), (exist1, err1) -> { 4182 if (err1 != null) { 4183 future.completeExceptionally(err1); 4184 return; 4185 } 4186 if (exist1) { 4187 future.completeExceptionally(new TableExistsException(newTableName)); 4188 return; 4189 } 4190 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 4191 if (err2 != null) { 4192 future.completeExceptionally(err2); 4193 return; 4194 } 4195 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 4196 if (preserveSplits) { 4197 addListener(getTableSplits(tableName), (splits, err3) -> { 4198 if (err3 != null) { 4199 future.completeExceptionally(err3); 4200 } else { 4201 addListener( 4202 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 4203 (result, err4) -> { 4204 if (err4 != null) { 4205 future.completeExceptionally(err4); 4206 } else { 4207 future.complete(result); 4208 } 4209 }); 4210 } 4211 }); 4212 } else { 4213 addListener(createTable(newTableDesc), (result, err5) -> { 4214 if (err5 != null) { 4215 future.completeExceptionally(err5); 4216 } else { 4217 future.complete(result); 4218 } 4219 }); 4220 } 4221 }); 4222 }); 4223 }); 4224 return future; 4225 } 4226 4227 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4228 List<RegionInfo> hris) { 4229 return this.<CacheEvictionStats> newAdminCaller() 4230 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4231 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4232 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4233 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4234 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4235 .serverName(serverName).call(); 4236 } 4237 4238 @Override 4239 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4240 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4241 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4242 Boolean> call(controller, stub, 4243 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4244 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4245 resp -> resp.getPreviousRpcThrottleEnabled())) 4246 .call(); 4247 return future; 4248 } 4249 4250 @Override 4251 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4252 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4253 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4254 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4255 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4256 resp -> resp.getRpcThrottleEnabled())) 4257 .call(); 4258 return future; 4259 } 4260 4261 @Override 4262 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4263 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4264 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4265 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4266 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4267 .build(), 4268 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4269 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4270 .call(); 4271 return future; 4272 } 4273 4274 @Override 4275 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4276 return this.<Map<TableName, Long>> newMasterCaller() 4277 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4278 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4279 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4280 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4281 resp -> resp.getSizesList().stream().collect(Collectors 4282 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4283 .call(); 4284 } 4285 4286 @Override 4287 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4288 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4289 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4290 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4291 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4292 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4293 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4294 resp -> resp.getSnapshotsList().stream() 4295 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4296 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4297 .serverName(serverName).call(); 4298 } 4299 4300 private CompletableFuture<SpaceQuotaSnapshot> 4301 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4302 return this.<SpaceQuotaSnapshot> newMasterCaller() 4303 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4304 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4305 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4306 .call(); 4307 } 4308 4309 @Override 4310 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4311 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4312 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4313 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4314 } 4315 4316 @Override 4317 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4318 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4319 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4320 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4321 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4322 } 4323 4324 @Override 4325 public CompletableFuture<Void> grant(UserPermission userPermission, 4326 boolean mergeExistingPermissions) { 4327 return this.<Void> newMasterCaller() 4328 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4329 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4330 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4331 .call(); 4332 } 4333 4334 @Override 4335 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4336 return this.<Void> newMasterCaller() 4337 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4338 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4339 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4340 .call(); 4341 } 4342 4343 @Override 4344 public CompletableFuture<List<UserPermission>> 4345 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4346 return this.<List<UserPermission>> newMasterCaller() 4347 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4348 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4349 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4350 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4351 resp -> resp.getUserPermissionList().stream() 4352 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4353 .collect(Collectors.toList()))) 4354 .call(); 4355 } 4356 4357 @Override 4358 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4359 List<Permission> permissions) { 4360 return this.<List<Boolean>> newMasterCaller() 4361 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4362 List<Boolean>> call(controller, stub, 4363 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4364 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4365 resp -> resp.getHasUserPermissionList())) 4366 .call(); 4367 } 4368 4369 @Override 4370 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4371 return this.<Boolean> newMasterCaller() 4372 .action((controller, stub) -> this.call(controller, stub, 4373 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4374 MasterService.Interface::switchSnapshotCleanup, 4375 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4376 .call(); 4377 } 4378 4379 @Override 4380 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4381 return this.<Boolean> newMasterCaller() 4382 .action((controller, stub) -> this.call(controller, stub, 4383 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4384 MasterService.Interface::isSnapshotCleanupEnabled, 4385 IsSnapshotCleanupEnabledResponse::getEnabled)) 4386 .call(); 4387 } 4388 4389 @Override 4390 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4391 return this.<Void> newMasterCaller() 4392 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4393 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4394 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4395 .call(); 4396 } 4397 4398 @Override 4399 public CompletableFuture<Void> addRSGroup(String groupName) { 4400 return this.<Void> newMasterCaller() 4401 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4402 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4403 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4404 .call(); 4405 } 4406 4407 @Override 4408 public CompletableFuture<Void> removeRSGroup(String groupName) { 4409 return this.<Void> newMasterCaller() 4410 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4411 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4412 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4413 .call(); 4414 } 4415 4416 @Override 4417 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4418 BalanceRequest request) { 4419 return this.<BalanceResponse> newMasterCaller() 4420 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4421 BalanceResponse> call(controller, stub, 4422 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4423 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4424 .call(); 4425 } 4426 4427 @Override 4428 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4429 return this.<List<RSGroupInfo>> newMasterCaller() 4430 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4431 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4432 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4433 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4434 .call(); 4435 } 4436 4437 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4438 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4439 final String logType) { 4440 if (CollectionUtils.isEmpty(serverNames)) { 4441 return CompletableFuture.completedFuture(Collections.emptyList()); 4442 } 4443 return CompletableFuture.supplyAsync(() -> serverNames 4444 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4445 filterParams, limit, logType)) 4446 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4447 } 4448 4449 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4450 Map<String, Object> filterParams, int limit, String logType) { 4451 return this.<List<LogEntry>> newAdminCaller() 4452 .action((controller, stub) -> this.adminCall(controller, stub, 4453 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4454 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4455 .serverName(serverName).call(); 4456 } 4457 4458 @Override 4459 public CompletableFuture<List<Boolean>> 4460 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4461 if (CollectionUtils.isEmpty(serverNames)) { 4462 return CompletableFuture.completedFuture(Collections.emptyList()); 4463 } 4464 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4465 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4466 return convertToFutureOfList(clearSlowLogResponseList); 4467 } 4468 4469 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4470 return this.<Boolean> newAdminCaller() 4471 .action((controller, stub) -> this.adminCall(controller, stub, 4472 RequestConverter.buildClearSlowLogResponseRequest(), 4473 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4474 .serverName(serverName).call(); 4475 } 4476 4477 private static <T> CompletableFuture<List<T>> 4478 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4479 CompletableFuture<Void> allDoneFuture = 4480 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4481 return allDoneFuture 4482 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4483 } 4484 4485 @Override 4486 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4487 return this.<List<TableName>> newMasterCaller() 4488 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4489 List<TableName>> call(controller, stub, 4490 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4491 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4492 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4493 .call(); 4494 } 4495 4496 @Override 4497 public CompletableFuture<Pair<List<String>, List<TableName>>> 4498 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4499 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4500 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4501 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4502 Pair<List<String>, List<TableName>>> call(controller, stub, 4503 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4504 .build(), 4505 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4506 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4507 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4508 .call(); 4509 } 4510 4511 @Override 4512 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4513 return this.<RSGroupInfo> newMasterCaller() 4514 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4515 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4516 GetRSGroupInfoOfServerRequest.newBuilder() 4517 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4518 .setPort(hostPort.getPort()).build()) 4519 .build(), 4520 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4521 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4522 .call(); 4523 } 4524 4525 @Override 4526 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4527 return this.<Void> newMasterCaller() 4528 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4529 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4530 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4531 .call(); 4532 } 4533 4534 @Override 4535 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4536 CompletableFuture<Void> future = new CompletableFuture<>(); 4537 for (TableName tableName : tables) { 4538 addListener(tableExists(tableName), (exist, err) -> { 4539 if (err != null) { 4540 future.completeExceptionally(err); 4541 return; 4542 } 4543 if (!exist) { 4544 future.completeExceptionally(new TableNotFoundException(tableName)); 4545 return; 4546 } 4547 }); 4548 } 4549 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4550 if (err != null) { 4551 future.completeExceptionally(err); 4552 return; 4553 } 4554 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4555 future.complete(null); 4556 return; 4557 } 4558 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4559 for (TableDescriptor td : tableDescriptions) { 4560 newTableDescriptors 4561 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4562 } 4563 addListener( 4564 CompletableFuture.allOf( 4565 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4566 (v, e) -> { 4567 if (e != null) { 4568 future.completeExceptionally(e); 4569 } else { 4570 future.complete(v); 4571 } 4572 }); 4573 }); 4574 return future; 4575 } 4576 4577 @Override 4578 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4579 return this.<RSGroupInfo> newMasterCaller() 4580 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4581 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4582 GetRSGroupInfoOfTableRequest.newBuilder() 4583 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4584 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4585 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4586 .call(); 4587 } 4588 4589 @Override 4590 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4591 return this.<RSGroupInfo> newMasterCaller() 4592 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4593 RSGroupInfo> call(controller, stub, 4594 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4595 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4596 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4597 .call(); 4598 } 4599 4600 @Override 4601 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4602 return this.<Void> newMasterCaller() 4603 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4604 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4605 .setNewRsgroupName(newName).build(), 4606 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4607 .call(); 4608 } 4609 4610 @Override 4611 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4612 Map<String, String> configuration) { 4613 UpdateRSGroupConfigRequest.Builder request = 4614 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4615 if (configuration != null) { 4616 configuration.entrySet().forEach(e -> request.addConfiguration( 4617 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4618 } 4619 return this.<Void> newMasterCaller() 4620 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4621 Void> call(controller, stub, request.build(), 4622 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4623 .call(); 4624 } 4625 4626 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4627 return this.<List<LogEntry>> newMasterCaller() 4628 .action((controller, stub) -> this.call(controller, stub, 4629 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4630 ProtobufUtil::toBalancerDecisionResponse)) 4631 .call(); 4632 } 4633 4634 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4635 return this.<List<LogEntry>> newMasterCaller() 4636 .action((controller, stub) -> this.call(controller, stub, 4637 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4638 ProtobufUtil::toBalancerRejectionResponse)) 4639 .call(); 4640 } 4641 4642 @Override 4643 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4644 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4645 if (logType == null || serverType == null) { 4646 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4647 } 4648 switch (logType) { 4649 case "SLOW_LOG": 4650 case "LARGE_LOG": 4651 if (ServerType.MASTER.equals(serverType)) { 4652 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4653 } 4654 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4655 case "BALANCER_DECISION": 4656 if (ServerType.REGION_SERVER.equals(serverType)) { 4657 throw new IllegalArgumentException( 4658 "Balancer Decision logs are not maintained by HRegionServer"); 4659 } 4660 return getBalancerDecisions(limit); 4661 case "BALANCER_REJECTION": 4662 if (ServerType.REGION_SERVER.equals(serverType)) { 4663 throw new IllegalArgumentException( 4664 "Balancer Rejection logs are not maintained by HRegionServer"); 4665 } 4666 return getBalancerRejections(limit); 4667 default: 4668 return CompletableFuture.completedFuture(Collections.emptyList()); 4669 } 4670 } 4671 4672 @Override 4673 public CompletableFuture<Void> flushMasterStore() { 4674 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4675 return this.<Void> newMasterCaller() 4676 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4677 Void> call(controller, stub, request.build(), 4678 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4679 .call(); 4680 } 4681 4682 @Override 4683 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4684 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4685 return this.<List<String>> newAdminCaller() 4686 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4687 List<String>> adminCall(controller, stub, request.build(), 4688 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4689 resp -> resp.getCachedFilesList())) 4690 .serverName(serverName).call(); 4691 } 4692 4693 @Override 4694 public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) { 4695 MasterProtos.RestoreBackupSystemTableRequest request = 4696 MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName) 4697 .build(); 4698 return this.<MasterProtos.RestoreBackupSystemTableRequest, 4699 MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request, 4700 MasterService.Interface::restoreBackupSystemTable, 4701 MasterProtos.RestoreBackupSystemTableResponse::getProcId, 4702 new RestoreBackupSystemTableProcedureBiConsumer()); 4703 } 4704 4705 private CompletableFuture<Long> internalRefershHFiles(RefreshHFilesRequest request) { 4706 return this.<Long> newMasterCaller() 4707 .action((controller, stub) -> this.<RefreshHFilesRequest, RefreshHFilesResponse, Long> call( 4708 controller, stub, request, MasterService.Interface::refreshHFiles, 4709 RefreshHFilesResponse::getProcId)) 4710 .call(); 4711 } 4712 4713 @Override 4714 public CompletableFuture<Long> refreshMeta() { 4715 RefreshMetaRequest.Builder request = RefreshMetaRequest.newBuilder(); 4716 request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()); 4717 return this.<Long> newMasterCaller() 4718 .action((controller, stub) -> this.<RefreshMetaRequest, RefreshMetaResponse, Long> call( 4719 controller, stub, request.build(), MasterService.Interface::refreshMeta, 4720 RefreshMetaResponse::getProcId)) 4721 .call(); 4722 } 4723 4724 @Override 4725 public CompletableFuture<Long> refreshHFiles(final TableName tableName) { 4726 if (tableName.isSystemTable()) { 4727 LOG.warn("Refreshing HFiles for system table {} is not allowed", tableName.getNameAsString()); 4728 throw new IllegalArgumentException( 4729 "Not allowed to refresh HFiles for system table '" + tableName.getNameAsString() + "'"); 4730 } 4731 // Request builder 4732 RefreshHFilesRequest.Builder request = RefreshHFilesRequest.newBuilder(); 4733 request.setTableName(ProtobufUtil.toProtoTableName(tableName)); 4734 request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()); 4735 return internalRefershHFiles(request.build()); 4736 } 4737 4738 @Override 4739 public CompletableFuture<Long> refreshHFiles(final String namespace) { 4740 if ( 4741 namespace.equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) 4742 || namespace.equals(NamespaceDescriptor.BACKUP_NAMESPACE_NAME_STR) 4743 ) { 4744 LOG.warn("Refreshing HFiles for reserve namespace {} is not allowed", namespace); 4745 throw new IllegalArgumentException( 4746 "Not allowed to refresh HFiles for reserve namespace '" + namespace + "'"); 4747 } 4748 // Request builder 4749 RefreshHFilesRequest.Builder request = RefreshHFilesRequest.newBuilder(); 4750 request.setNamespace(namespace); 4751 request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()); 4752 return internalRefershHFiles(request.build()); 4753 } 4754 4755 @Override 4756 public CompletableFuture<Long> refreshHFiles() { 4757 // Request builder 4758 RefreshHFilesRequest.Builder request = RefreshHFilesRequest.newBuilder(); 4759 // Set nonce 4760 request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()); 4761 return internalRefershHFiles(request.build()); 4762 } 4763}