001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.function.BiConsumer; 042import java.util.function.Consumer; 043import java.util.function.Function; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.CacheEvictionStats; 050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 051import org.apache.hadoop.hbase.CatalogFamilyFormat; 052import org.apache.hadoop.hbase.ClientMetaTableAccessor; 053import org.apache.hadoop.hbase.ClusterMetrics; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.ClusterMetricsBuilder; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.NamespaceDescriptor; 059import org.apache.hadoop.hbase.RegionLocations; 060import org.apache.hadoop.hbase.RegionMetrics; 061import org.apache.hadoop.hbase.RegionMetricsBuilder; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableExistsException; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.TableNotDisabledException; 066import org.apache.hadoop.hbase.TableNotEnabledException; 067import org.apache.hadoop.hbase.TableNotFoundException; 068import org.apache.hadoop.hbase.UnknownRegionException; 069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.Scan.ReadType; 073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 074import org.apache.hadoop.hbase.client.replication.TableCFs; 075import org.apache.hadoop.hbase.client.security.SecurityCapability; 076import org.apache.hadoop.hbase.exceptions.DeserializationException; 077import org.apache.hadoop.hbase.ipc.HBaseRpcController; 078import org.apache.hadoop.hbase.net.Address; 079import org.apache.hadoop.hbase.quotas.QuotaFilter; 080import org.apache.hadoop.hbase.quotas.QuotaSettings; 081import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 083import org.apache.hadoop.hbase.replication.ReplicationException; 084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 086import org.apache.hadoop.hbase.replication.SyncReplicationState; 087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 089import org.apache.hadoop.hbase.security.access.Permission; 090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 091import org.apache.hadoop.hbase.security.access.UserPermission; 092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.yetus.audience.InterfaceAudience; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 104import org.apache.hbase.thirdparty.com.google.protobuf.Message; 105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 108import org.apache.hbase.thirdparty.io.netty.util.Timeout; 109import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 111 112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 343 344/** 345 * The implementation of AsyncAdmin. 346 * <p> 347 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 348 * be finished inside the rpc framework thread, which means that the callbacks registered to the 349 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 350 * this class should not try to do time consuming tasks in the callbacks. 351 * @since 2.0.0 352 * @see AsyncHBaseAdmin 353 * @see AsyncConnection#getAdmin() 354 * @see AsyncConnection#getAdminBuilder() 355 */ 356@InterfaceAudience.Private 357class RawAsyncHBaseAdmin implements AsyncAdmin { 358 359 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 360 361 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 362 363 private final AsyncConnectionImpl connection; 364 365 private final HashedWheelTimer retryTimer; 366 367 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 368 369 private final long rpcTimeoutNs; 370 371 private final long operationTimeoutNs; 372 373 private final long pauseNs; 374 375 private final long pauseNsForServerOverloaded; 376 377 private final int maxAttempts; 378 379 private final int startLogErrorsCnt; 380 381 private final NonceGenerator ng; 382 383 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 384 AsyncAdminBuilderBase builder) { 385 this.connection = connection; 386 this.retryTimer = retryTimer; 387 this.metaTable = connection.getTable(META_TABLE_NAME); 388 this.rpcTimeoutNs = builder.rpcTimeoutNs; 389 this.operationTimeoutNs = builder.operationTimeoutNs; 390 this.pauseNs = builder.pauseNs; 391 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 392 LOG.warn( 393 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 394 + " the normal pause value {} ms, use the greater one instead", 395 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 396 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 397 this.pauseNsForServerOverloaded = builder.pauseNs; 398 } else { 399 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 400 } 401 this.maxAttempts = builder.maxAttempts; 402 this.startLogErrorsCnt = builder.startLogErrorsCnt; 403 this.ng = connection.getNonceGenerator(); 404 } 405 406 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 407 return this.connection.callerFactory.<T> masterRequest() 408 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 409 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 410 .pause(pauseNs, TimeUnit.NANOSECONDS) 411 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 412 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 413 } 414 415 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 416 return this.connection.callerFactory.<T> adminRequest() 417 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 418 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 419 .pause(pauseNs, TimeUnit.NANOSECONDS) 420 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 421 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 422 } 423 424 @FunctionalInterface 425 private interface MasterRpcCall<RESP, REQ> { 426 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 427 RpcCallback<RESP> done); 428 } 429 430 @FunctionalInterface 431 private interface AdminRpcCall<RESP, REQ> { 432 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 433 RpcCallback<RESP> done); 434 } 435 436 @FunctionalInterface 437 private interface Converter<D, S> { 438 D convert(S src) throws IOException; 439 } 440 441 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 442 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 443 Converter<RESP, PRESP> respConverter) { 444 CompletableFuture<RESP> future = new CompletableFuture<>(); 445 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 446 447 @Override 448 public void run(PRESP resp) { 449 if (controller.failed()) { 450 future.completeExceptionally(controller.getFailed()); 451 } else { 452 try { 453 future.complete(respConverter.convert(resp)); 454 } catch (IOException e) { 455 future.completeExceptionally(e); 456 } 457 } 458 } 459 }); 460 return future; 461 } 462 463 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 464 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 465 Converter<RESP, PRESP> respConverter) { 466 CompletableFuture<RESP> future = new CompletableFuture<>(); 467 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 468 469 @Override 470 public void run(PRESP resp) { 471 if (controller.failed()) { 472 future.completeExceptionally(controller.getFailed()); 473 } else { 474 try { 475 future.complete(respConverter.convert(resp)); 476 } catch (IOException e) { 477 future.completeExceptionally(e); 478 } 479 } 480 } 481 }); 482 return future; 483 } 484 485 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 486 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 487 ProcedureBiConsumer consumer) { 488 return procedureCall(b -> { 489 }, preq, rpcCall, respConverter, consumer); 490 } 491 492 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 493 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 494 ProcedureBiConsumer consumer) { 495 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 496 } 497 498 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 499 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 500 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 501 ProcedureBiConsumer consumer) { 502 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 503 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 504 prioritySetter.accept(builder); 505 CompletableFuture<Long> procFuture = builder.call(); 506 CompletableFuture<Void> future = waitProcedureResult(procFuture); 507 addListener(future, consumer); 508 return future; 509 } 510 511 @Override 512 public CompletableFuture<Boolean> tableExists(TableName tableName) { 513 if (TableName.isMetaTableName(tableName)) { 514 return CompletableFuture.completedFuture(true); 515 } 516 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 517 } 518 519 @Override 520 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 521 return getTableDescriptors( 522 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 523 } 524 525 /** 526 * {@link #listTableDescriptors(boolean)} 527 */ 528 @Override 529 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 530 boolean includeSysTables) { 531 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 532 + "use listTableDescriptors(boolean) instead"); 533 return getTableDescriptors( 534 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 535 } 536 537 @Override 538 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 539 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 540 + "use listTableDescriptors(boolean) instead"); 541 if (tableNames.isEmpty()) { 542 return CompletableFuture.completedFuture(Collections.emptyList()); 543 } 544 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 545 } 546 547 private CompletableFuture<List<TableDescriptor>> 548 getTableDescriptors(GetTableDescriptorsRequest request) { 549 return this.<List<TableDescriptor>> newMasterCaller() 550 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 551 List<TableDescriptor>> call(controller, stub, request, 552 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 553 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 554 .call(); 555 } 556 557 @Override 558 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 559 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 560 } 561 562 @Override 563 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 564 boolean includeSysTables) { 565 Preconditions.checkNotNull(pattern, 566 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 567 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 568 } 569 570 @Override 571 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 572 return this.<List<TableName>> newMasterCaller() 573 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 574 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 575 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 576 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 577 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 578 .call(); 579 } 580 581 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 582 return this.<List<TableName>> newMasterCaller() 583 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 584 List<TableName>> call(controller, stub, request, 585 (s, c, req, done) -> s.getTableNames(c, req, done), 586 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 587 .call(); 588 } 589 590 @Override 591 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 592 return this.<List<TableDescriptor>> newMasterCaller() 593 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 594 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 595 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 596 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 597 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 598 .call(); 599 } 600 601 @Override 602 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 603 return this.<List<TableDescriptor>> newMasterCaller() 604 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 605 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 606 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 607 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 608 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 609 .call(); 610 } 611 612 @Override 613 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 614 return this.<List<TableName>> newMasterCaller() 615 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 616 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 617 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 618 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 619 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 620 .call(); 621 } 622 623 @Override 624 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 625 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 626 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 627 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 628 List<TableSchema>> call(controller, stub, 629 RequestConverter.buildGetTableDescriptorsRequest(tableName), 630 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 631 (resp) -> resp.getTableSchemaList())) 632 .call(), (tableSchemas, error) -> { 633 if (error != null) { 634 future.completeExceptionally(error); 635 return; 636 } 637 if (!tableSchemas.isEmpty()) { 638 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 639 } else { 640 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 641 } 642 }); 643 return future; 644 } 645 646 @Override 647 public CompletableFuture<Void> createTable(TableDescriptor desc) { 648 return createTable(desc.getTableName(), 649 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 650 } 651 652 @Override 653 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 654 int numRegions) { 655 try { 656 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 657 } catch (IllegalArgumentException e) { 658 return failedFuture(e); 659 } 660 } 661 662 @Override 663 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 664 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 665 + " use createTable(TableDescriptor) instead"); 666 try { 667 verifySplitKeys(splitKeys); 668 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 669 splitKeys, ng.getNonceGroup(), ng.newNonce())); 670 } catch (IllegalArgumentException e) { 671 return failedFuture(e); 672 } 673 } 674 675 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 676 Preconditions.checkNotNull(tableName, "table name is null"); 677 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 678 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 679 new CreateTableProcedureBiConsumer(tableName)); 680 } 681 682 @Override 683 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 684 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 685 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 686 ng.newNonce()), 687 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 688 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 689 } 690 691 @Override 692 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 693 return this.<ModifyTableStoreFileTrackerRequest, 694 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 695 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 696 ng.getNonceGroup(), ng.newNonce()), 697 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 698 (resp) -> resp.getProcId(), 699 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 700 } 701 702 @Override 703 public CompletableFuture<Void> deleteTable(TableName tableName) { 704 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 705 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 706 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 707 new DeleteTableProcedureBiConsumer(tableName)); 708 } 709 710 @Override 711 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 712 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 713 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 714 ng.newNonce()), 715 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 716 new TruncateTableProcedureBiConsumer(tableName)); 717 } 718 719 @Override 720 public CompletableFuture<Void> enableTable(TableName tableName) { 721 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 722 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 723 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 724 new EnableTableProcedureBiConsumer(tableName)); 725 } 726 727 @Override 728 public CompletableFuture<Void> disableTable(TableName tableName) { 729 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 730 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 731 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 732 new DisableTableProcedureBiConsumer(tableName)); 733 } 734 735 /** 736 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 737 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 738 * targetState). 739 */ 740 private static CompletableFuture<Boolean> completeCheckTableState( 741 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 742 TableState.State targetState, TableName tableName) { 743 if (error != null) { 744 future.completeExceptionally(error); 745 } else { 746 if (tableState != null) { 747 future.complete(tableState.inStates(targetState)); 748 } else { 749 future.completeExceptionally(new TableNotFoundException(tableName)); 750 } 751 } 752 return future; 753 } 754 755 @Override 756 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 757 if (TableName.isMetaTableName(tableName)) { 758 return CompletableFuture.completedFuture(true); 759 } 760 CompletableFuture<Boolean> future = new CompletableFuture<>(); 761 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 762 (tableState, error) -> { 763 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 764 TableState.State.ENABLED, tableName); 765 }); 766 return future; 767 } 768 769 @Override 770 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 771 if (TableName.isMetaTableName(tableName)) { 772 return CompletableFuture.completedFuture(false); 773 } 774 CompletableFuture<Boolean> future = new CompletableFuture<>(); 775 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 776 (tableState, error) -> { 777 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 778 TableState.State.DISABLED, tableName); 779 }); 780 return future; 781 } 782 783 @Override 784 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 785 if (TableName.isMetaTableName(tableName)) { 786 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 787 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 788 } 789 CompletableFuture<Boolean> future = new CompletableFuture<>(); 790 addListener(isTableEnabled(tableName), (enabled, error) -> { 791 if (error != null) { 792 if (error instanceof TableNotFoundException) { 793 future.complete(false); 794 } else { 795 future.completeExceptionally(error); 796 } 797 return; 798 } 799 if (!enabled) { 800 future.complete(false); 801 } else { 802 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 803 (locations, error1) -> { 804 if (error1 != null) { 805 future.completeExceptionally(error1); 806 return; 807 } 808 List<HRegionLocation> notDeployedRegions = locations.stream() 809 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 810 if (notDeployedRegions.size() > 0) { 811 if (LOG.isDebugEnabled()) { 812 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 813 } 814 future.complete(false); 815 return; 816 } 817 future.complete(true); 818 }); 819 } 820 }); 821 return future; 822 } 823 824 @Override 825 public CompletableFuture<Void> addColumnFamily(TableName tableName, 826 ColumnFamilyDescriptor columnFamily) { 827 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 828 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 829 ng.newNonce()), 830 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 831 new AddColumnFamilyProcedureBiConsumer(tableName)); 832 } 833 834 @Override 835 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 836 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 837 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 838 ng.newNonce()), 839 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 840 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 841 } 842 843 @Override 844 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 845 ColumnFamilyDescriptor columnFamily) { 846 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 847 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 848 ng.newNonce()), 849 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 850 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 851 } 852 853 @Override 854 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 855 byte[] family, String dstSFT) { 856 return this.<ModifyColumnStoreFileTrackerRequest, 857 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 858 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 859 ng.getNonceGroup(), ng.newNonce()), 860 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 861 (resp) -> resp.getProcId(), 862 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 863 } 864 865 @Override 866 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 867 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 868 RequestConverter.buildCreateNamespaceRequest(descriptor), 869 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 870 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 871 } 872 873 @Override 874 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 875 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 876 RequestConverter.buildModifyNamespaceRequest(descriptor), 877 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 878 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 879 } 880 881 @Override 882 public CompletableFuture<Void> deleteNamespace(String name) { 883 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 884 RequestConverter.buildDeleteNamespaceRequest(name), 885 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 886 new DeleteNamespaceProcedureBiConsumer(name)); 887 } 888 889 @Override 890 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 891 return this.<NamespaceDescriptor> newMasterCaller() 892 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 893 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 894 RequestConverter.buildGetNamespaceDescriptorRequest(name), 895 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 896 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 897 .call(); 898 } 899 900 @Override 901 public CompletableFuture<List<String>> listNamespaces() { 902 return this.<List<String>> newMasterCaller() 903 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 904 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 905 (s, c, req, done) -> s.listNamespaces(c, req, done), 906 (resp) -> resp.getNamespaceNameList())) 907 .call(); 908 } 909 910 @Override 911 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 912 return this.<List<NamespaceDescriptor>> newMasterCaller() 913 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 914 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 915 ListNamespaceDescriptorsRequest.newBuilder().build(), 916 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 917 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 918 .call(); 919 } 920 921 @Override 922 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 923 return this.<List<RegionInfo>> newAdminCaller() 924 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 925 List<RegionInfo>> adminCall(controller, stub, 926 RequestConverter.buildGetOnlineRegionRequest(), 927 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 928 resp -> ProtobufUtil.getRegionInfos(resp))) 929 .serverName(serverName).call(); 930 } 931 932 @Override 933 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 934 if (tableName.equals(META_TABLE_NAME)) { 935 return connection.registry.getMetaRegionLocations() 936 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 937 .collect(Collectors.toList())); 938 } else { 939 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 940 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 941 } 942 } 943 944 @Override 945 public CompletableFuture<Void> flush(TableName tableName) { 946 return flush(tableName, null); 947 } 948 949 @Override 950 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 951 CompletableFuture<Void> future = new CompletableFuture<>(); 952 addListener(tableExists(tableName), (exists, err) -> { 953 if (err != null) { 954 future.completeExceptionally(err); 955 } else if (!exists) { 956 future.completeExceptionally(new TableNotFoundException(tableName)); 957 } else { 958 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 959 if (err2 != null) { 960 future.completeExceptionally(err2); 961 } else if (!tableEnabled) { 962 future.completeExceptionally(new TableNotEnabledException(tableName)); 963 } else { 964 Map<String, String> props = new HashMap<>(); 965 if (columnFamily != null) { 966 props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); 967 } 968 addListener( 969 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 970 (ret, err3) -> { 971 if (err3 != null) { 972 future.completeExceptionally(err3); 973 } else { 974 future.complete(ret); 975 } 976 }); 977 } 978 }); 979 } 980 }); 981 return future; 982 } 983 984 @Override 985 public CompletableFuture<Void> flushRegion(byte[] regionName) { 986 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 987 }); 988 } 989 990 @Override 991 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 992 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 993 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 994 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 995 }); 996 } 997 998 /** 999 * This method is for internal use only, where we need the response of the flush. 1000 * <p/> 1001 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1002 * API. 1003 */ 1004 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1005 boolean writeFlushWALMarker) { 1006 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1007 addListener(getRegionLocation(regionName), (location, err) -> { 1008 if (err != null) { 1009 future.completeExceptionally(err); 1010 return; 1011 } 1012 ServerName serverName = location.getServerName(); 1013 if (serverName == null) { 1014 future 1015 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1016 return; 1017 } 1018 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1019 (ret, err2) -> { 1020 if (err2 != null) { 1021 future.completeExceptionally(err2); 1022 } else { 1023 future.complete(ret); 1024 } 1025 }); 1026 }); 1027 return future; 1028 } 1029 1030 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1031 byte[] columnFamily, boolean writeFlushWALMarker) { 1032 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1033 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1034 FlushRegionResponse> adminCall(controller, stub, 1035 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1036 writeFlushWALMarker), 1037 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1038 .call(); 1039 } 1040 1041 @Override 1042 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1043 CompletableFuture<Void> future = new CompletableFuture<>(); 1044 addListener(getRegions(sn), (hRegionInfos, err) -> { 1045 if (err != null) { 1046 future.completeExceptionally(err); 1047 return; 1048 } 1049 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1050 if (hRegionInfos != null) { 1051 hRegionInfos 1052 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1053 }))); 1054 } 1055 addListener(CompletableFuture.allOf( 1056 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1057 if (err2 != null) { 1058 future.completeExceptionally(err2); 1059 } else { 1060 future.complete(ret); 1061 } 1062 }); 1063 }); 1064 return future; 1065 } 1066 1067 @Override 1068 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1069 return compact(tableName, null, false, compactType); 1070 } 1071 1072 @Override 1073 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1074 CompactType compactType) { 1075 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1076 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1077 return compact(tableName, columnFamily, false, compactType); 1078 } 1079 1080 @Override 1081 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1082 return compactRegion(regionName, null, false); 1083 } 1084 1085 @Override 1086 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1087 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1088 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1089 return compactRegion(regionName, columnFamily, false); 1090 } 1091 1092 @Override 1093 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1094 return compact(tableName, null, true, compactType); 1095 } 1096 1097 @Override 1098 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1099 CompactType compactType) { 1100 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1101 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1102 return compact(tableName, columnFamily, true, compactType); 1103 } 1104 1105 @Override 1106 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1107 return compactRegion(regionName, null, true); 1108 } 1109 1110 @Override 1111 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1112 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1113 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1114 return compactRegion(regionName, columnFamily, true); 1115 } 1116 1117 @Override 1118 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1119 return compactRegionServer(sn, false); 1120 } 1121 1122 @Override 1123 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1124 return compactRegionServer(sn, true); 1125 } 1126 1127 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1128 CompletableFuture<Void> future = new CompletableFuture<>(); 1129 addListener(getRegions(sn), (hRegionInfos, err) -> { 1130 if (err != null) { 1131 future.completeExceptionally(err); 1132 return; 1133 } 1134 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1135 if (hRegionInfos != null) { 1136 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1137 } 1138 addListener(CompletableFuture.allOf( 1139 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1140 if (err2 != null) { 1141 future.completeExceptionally(err2); 1142 } else { 1143 future.complete(ret); 1144 } 1145 }); 1146 }); 1147 return future; 1148 } 1149 1150 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1151 boolean major) { 1152 CompletableFuture<Void> future = new CompletableFuture<>(); 1153 addListener(getRegionLocation(regionName), (location, err) -> { 1154 if (err != null) { 1155 future.completeExceptionally(err); 1156 return; 1157 } 1158 ServerName serverName = location.getServerName(); 1159 if (serverName == null) { 1160 future 1161 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1162 return; 1163 } 1164 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1165 (ret, err2) -> { 1166 if (err2 != null) { 1167 future.completeExceptionally(err2); 1168 } else { 1169 future.complete(ret); 1170 } 1171 }); 1172 }); 1173 return future; 1174 } 1175 1176 /** 1177 * List all region locations for the specific table. 1178 */ 1179 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1180 if (TableName.META_TABLE_NAME.equals(tableName)) { 1181 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1182 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1183 if (err != null) { 1184 future.completeExceptionally(err); 1185 } else if ( 1186 metaRegions == null || metaRegions.isEmpty() 1187 || metaRegions.getDefaultRegionLocation() == null 1188 ) { 1189 future.completeExceptionally(new IOException("meta region does not found")); 1190 } else { 1191 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1192 } 1193 }); 1194 return future; 1195 } else { 1196 // For non-meta table, we fetch all locations by scanning hbase:meta table 1197 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1198 } 1199 } 1200 1201 /** 1202 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1203 */ 1204 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1205 CompactType compactType) { 1206 CompletableFuture<Void> future = new CompletableFuture<>(); 1207 1208 switch (compactType) { 1209 case MOB: 1210 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1211 if (err != null) { 1212 future.completeExceptionally(err); 1213 return; 1214 } 1215 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1216 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1217 if (err2 != null) { 1218 future.completeExceptionally(err2); 1219 } else { 1220 future.complete(ret); 1221 } 1222 }); 1223 }); 1224 break; 1225 case NORMAL: 1226 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1227 if (err != null) { 1228 future.completeExceptionally(err); 1229 return; 1230 } 1231 if (locations == null || locations.isEmpty()) { 1232 future.completeExceptionally(new TableNotFoundException(tableName)); 1233 } 1234 CompletableFuture<?>[] compactFutures = 1235 locations.stream().filter(l -> l.getRegion() != null) 1236 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1237 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1238 .toArray(CompletableFuture<?>[]::new); 1239 // future complete unless all of the compact futures are completed. 1240 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1241 if (err2 != null) { 1242 future.completeExceptionally(err2); 1243 } else { 1244 future.complete(ret); 1245 } 1246 }); 1247 }); 1248 break; 1249 default: 1250 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1251 } 1252 return future; 1253 } 1254 1255 /** 1256 * Compact the region at specific region server. 1257 */ 1258 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1259 final boolean major, byte[] columnFamily) { 1260 return this.<Void> newAdminCaller().serverName(sn) 1261 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1262 Void> adminCall(controller, stub, 1263 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1264 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1265 .call(); 1266 } 1267 1268 private byte[] toEncodeRegionName(byte[] regionName) { 1269 return RegionInfo.isEncodedRegionName(regionName) 1270 ? regionName 1271 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1272 } 1273 1274 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1275 CompletableFuture<TableName> result) { 1276 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1277 if (err != null) { 1278 result.completeExceptionally(err); 1279 return; 1280 } 1281 RegionInfo regionInfo = location.getRegion(); 1282 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1283 result.completeExceptionally( 1284 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1285 return; 1286 } 1287 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1288 if (!tableName.get().equals(regionInfo.getTable())) { 1289 // tables of this two region should be same. 1290 result.completeExceptionally( 1291 new IllegalArgumentException("Cannot merge regions from two different tables " 1292 + tableName.get() + " and " + regionInfo.getTable())); 1293 } else { 1294 result.complete(tableName.get()); 1295 } 1296 } 1297 }); 1298 } 1299 1300 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1301 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1302 CompletableFuture<TableName> future = new CompletableFuture<>(); 1303 for (byte[] encodedRegionName : encodedRegionNames) { 1304 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1305 } 1306 return future; 1307 } 1308 1309 @Override 1310 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1311 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1312 } 1313 1314 @Override 1315 public CompletableFuture<Boolean> isMergeEnabled() { 1316 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1317 } 1318 1319 @Override 1320 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1321 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1322 } 1323 1324 @Override 1325 public CompletableFuture<Boolean> isSplitEnabled() { 1326 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1327 } 1328 1329 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1330 MasterSwitchType switchType) { 1331 SetSplitOrMergeEnabledRequest request = 1332 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1333 return this.<Boolean> newMasterCaller() 1334 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1335 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1336 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1337 (resp) -> resp.getPrevValueList().get(0))) 1338 .call(); 1339 } 1340 1341 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1342 IsSplitOrMergeEnabledRequest request = 1343 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1344 return this.<Boolean> newMasterCaller() 1345 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1346 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1347 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1348 .call(); 1349 } 1350 1351 @Override 1352 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1353 if (nameOfRegionsToMerge.size() < 2) { 1354 return failedFuture(new IllegalArgumentException( 1355 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1356 } 1357 CompletableFuture<Void> future = new CompletableFuture<>(); 1358 byte[][] encodedNameOfRegionsToMerge = 1359 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1360 1361 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1362 if (err != null) { 1363 future.completeExceptionally(err); 1364 return; 1365 } 1366 1367 final MergeTableRegionsRequest request; 1368 try { 1369 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1370 forcible, ng.getNonceGroup(), ng.newNonce()); 1371 } catch (DeserializationException e) { 1372 future.completeExceptionally(e); 1373 return; 1374 } 1375 1376 addListener( 1377 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1378 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1379 (ret, err2) -> { 1380 if (err2 != null) { 1381 future.completeExceptionally(err2); 1382 } else { 1383 future.complete(ret); 1384 } 1385 }); 1386 }); 1387 return future; 1388 } 1389 1390 @Override 1391 public CompletableFuture<Void> split(TableName tableName) { 1392 CompletableFuture<Void> future = new CompletableFuture<>(); 1393 addListener(tableExists(tableName), (exist, error) -> { 1394 if (error != null) { 1395 future.completeExceptionally(error); 1396 return; 1397 } 1398 if (!exist) { 1399 future.completeExceptionally(new TableNotFoundException(tableName)); 1400 return; 1401 } 1402 addListener(metaTable 1403 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1404 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1405 ClientMetaTableAccessor.QueryType.REGION)) 1406 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1407 ClientMetaTableAccessor.QueryType.REGION))), 1408 (results, err2) -> { 1409 if (err2 != null) { 1410 future.completeExceptionally(err2); 1411 return; 1412 } 1413 if (results != null && !results.isEmpty()) { 1414 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1415 for (Result r : results) { 1416 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1417 continue; 1418 } 1419 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1420 if (rl != null) { 1421 for (HRegionLocation h : rl.getRegionLocations()) { 1422 if (h != null && h.getServerName() != null) { 1423 RegionInfo hri = h.getRegion(); 1424 if ( 1425 hri == null || hri.isSplitParent() 1426 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1427 ) { 1428 continue; 1429 } 1430 splitFutures.add(split(hri, null)); 1431 } 1432 } 1433 } 1434 } 1435 addListener( 1436 CompletableFuture 1437 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1438 (ret, exception) -> { 1439 if (exception != null) { 1440 future.completeExceptionally(exception); 1441 return; 1442 } 1443 future.complete(ret); 1444 }); 1445 } else { 1446 future.complete(null); 1447 } 1448 }); 1449 }); 1450 return future; 1451 } 1452 1453 @Override 1454 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1455 CompletableFuture<Void> result = new CompletableFuture<>(); 1456 if (splitPoint == null) { 1457 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1458 } 1459 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1460 (loc, err) -> { 1461 if (err != null) { 1462 result.completeExceptionally(err); 1463 } else if (loc == null || loc.getRegion() == null) { 1464 result.completeExceptionally(new IllegalArgumentException( 1465 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1466 } else { 1467 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1468 if (err2 != null) { 1469 result.completeExceptionally(err2); 1470 } else { 1471 result.complete(ret); 1472 } 1473 1474 }); 1475 } 1476 }); 1477 return result; 1478 } 1479 1480 @Override 1481 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1482 CompletableFuture<Void> future = new CompletableFuture<>(); 1483 addListener(getRegionLocation(regionName), (location, err) -> { 1484 if (err != null) { 1485 future.completeExceptionally(err); 1486 return; 1487 } 1488 RegionInfo regionInfo = location.getRegion(); 1489 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1490 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1491 + "Replicas are auto-split when their primary is split.")); 1492 return; 1493 } 1494 ServerName serverName = location.getServerName(); 1495 if (serverName == null) { 1496 future 1497 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1498 return; 1499 } 1500 addListener(split(regionInfo, null), (ret, err2) -> { 1501 if (err2 != null) { 1502 future.completeExceptionally(err2); 1503 } else { 1504 future.complete(ret); 1505 } 1506 }); 1507 }); 1508 return future; 1509 } 1510 1511 @Override 1512 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1513 Preconditions.checkNotNull(splitPoint, 1514 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1515 CompletableFuture<Void> future = new CompletableFuture<>(); 1516 addListener(getRegionLocation(regionName), (location, err) -> { 1517 if (err != null) { 1518 future.completeExceptionally(err); 1519 return; 1520 } 1521 RegionInfo regionInfo = location.getRegion(); 1522 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1523 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1524 + "Replicas are auto-split when their primary is split.")); 1525 return; 1526 } 1527 ServerName serverName = location.getServerName(); 1528 if (serverName == null) { 1529 future 1530 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1531 return; 1532 } 1533 if ( 1534 regionInfo.getStartKey() != null 1535 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1536 ) { 1537 future.completeExceptionally( 1538 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1539 return; 1540 } 1541 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1542 if (err2 != null) { 1543 future.completeExceptionally(err2); 1544 } else { 1545 future.complete(ret); 1546 } 1547 }); 1548 }); 1549 return future; 1550 } 1551 1552 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1553 CompletableFuture<Void> future = new CompletableFuture<>(); 1554 TableName tableName = hri.getTable(); 1555 final SplitTableRegionRequest request; 1556 try { 1557 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1558 ng.newNonce()); 1559 } catch (DeserializationException e) { 1560 future.completeExceptionally(e); 1561 return future; 1562 } 1563 1564 addListener( 1565 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1566 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1567 (ret, err2) -> { 1568 if (err2 != null) { 1569 future.completeExceptionally(err2); 1570 } else { 1571 future.complete(ret); 1572 } 1573 }); 1574 return future; 1575 } 1576 1577 @Override 1578 public CompletableFuture<Void> assign(byte[] regionName) { 1579 CompletableFuture<Void> future = new CompletableFuture<>(); 1580 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1581 if (err != null) { 1582 future.completeExceptionally(err); 1583 return; 1584 } 1585 addListener( 1586 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1587 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1588 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1589 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1590 .call(), 1591 (ret, err2) -> { 1592 if (err2 != null) { 1593 future.completeExceptionally(err2); 1594 } else { 1595 future.complete(ret); 1596 } 1597 }); 1598 }); 1599 return future; 1600 } 1601 1602 @Override 1603 public CompletableFuture<Void> unassign(byte[] regionName) { 1604 CompletableFuture<Void> future = new CompletableFuture<>(); 1605 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1606 if (err != null) { 1607 future.completeExceptionally(err); 1608 return; 1609 } 1610 addListener( 1611 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1612 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1613 Void> call(controller, stub, 1614 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1615 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1616 .call(), 1617 (ret, err2) -> { 1618 if (err2 != null) { 1619 future.completeExceptionally(err2); 1620 } else { 1621 future.complete(ret); 1622 } 1623 }); 1624 }); 1625 return future; 1626 } 1627 1628 @Override 1629 public CompletableFuture<Void> offline(byte[] regionName) { 1630 CompletableFuture<Void> future = new CompletableFuture<>(); 1631 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1632 if (err != null) { 1633 future.completeExceptionally(err); 1634 return; 1635 } 1636 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1637 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1638 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1639 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1640 .call(), (ret, err2) -> { 1641 if (err2 != null) { 1642 future.completeExceptionally(err2); 1643 } else { 1644 future.complete(ret); 1645 } 1646 }); 1647 }); 1648 return future; 1649 } 1650 1651 @Override 1652 public CompletableFuture<Void> move(byte[] regionName) { 1653 CompletableFuture<Void> future = new CompletableFuture<>(); 1654 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1655 if (err != null) { 1656 future.completeExceptionally(err); 1657 return; 1658 } 1659 addListener( 1660 moveRegion(regionInfo, 1661 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1662 (ret, err2) -> { 1663 if (err2 != null) { 1664 future.completeExceptionally(err2); 1665 } else { 1666 future.complete(ret); 1667 } 1668 }); 1669 }); 1670 return future; 1671 } 1672 1673 @Override 1674 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1675 Preconditions.checkNotNull(destServerName, 1676 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1677 CompletableFuture<Void> future = new CompletableFuture<>(); 1678 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1679 if (err != null) { 1680 future.completeExceptionally(err); 1681 return; 1682 } 1683 addListener( 1684 moveRegion(regionInfo, RequestConverter 1685 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1686 (ret, err2) -> { 1687 if (err2 != null) { 1688 future.completeExceptionally(err2); 1689 } else { 1690 future.complete(ret); 1691 } 1692 }); 1693 }); 1694 return future; 1695 } 1696 1697 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1698 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1699 .action( 1700 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1701 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1702 .call(); 1703 } 1704 1705 @Override 1706 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1707 return this.<Void> newMasterCaller() 1708 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1709 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1710 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1711 .call(); 1712 } 1713 1714 @Override 1715 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1716 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1717 Scan scan = QuotaTableUtil.makeScan(filter); 1718 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1719 new AdvancedScanResultConsumer() { 1720 List<QuotaSettings> settings = new ArrayList<>(); 1721 1722 @Override 1723 public void onNext(Result[] results, ScanController controller) { 1724 for (Result result : results) { 1725 try { 1726 QuotaTableUtil.parseResultToCollection(result, settings); 1727 } catch (IOException e) { 1728 controller.terminate(); 1729 future.completeExceptionally(e); 1730 } 1731 } 1732 } 1733 1734 @Override 1735 public void onError(Throwable error) { 1736 future.completeExceptionally(error); 1737 } 1738 1739 @Override 1740 public void onComplete() { 1741 future.complete(settings); 1742 } 1743 }); 1744 return future; 1745 } 1746 1747 @Override 1748 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1749 boolean enabled) { 1750 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1751 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1752 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1753 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1754 } 1755 1756 @Override 1757 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1758 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1759 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1760 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1761 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1762 } 1763 1764 @Override 1765 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1766 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1767 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1768 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1769 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1770 } 1771 1772 @Override 1773 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1774 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1775 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1776 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1777 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1778 } 1779 1780 @Override 1781 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1782 return this.<ReplicationPeerConfig> newMasterCaller() 1783 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1784 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1785 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1786 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1787 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1788 .call(); 1789 } 1790 1791 @Override 1792 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1793 ReplicationPeerConfig peerConfig) { 1794 return this.<UpdateReplicationPeerConfigRequest, 1795 UpdateReplicationPeerConfigResponse> procedureCall( 1796 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1797 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1798 (resp) -> resp.getProcId(), 1799 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1800 } 1801 1802 @Override 1803 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1804 SyncReplicationState clusterState) { 1805 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1806 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1807 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1808 clusterState), 1809 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1810 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1811 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1812 } 1813 1814 @Override 1815 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1816 Map<TableName, List<String>> tableCfs) { 1817 if (tableCfs == null) { 1818 return failedFuture(new ReplicationException("tableCfs is null")); 1819 } 1820 1821 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1822 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1823 if (!completeExceptionally(future, error)) { 1824 ReplicationPeerConfig newPeerConfig = 1825 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1826 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1827 if (!completeExceptionally(future, error)) { 1828 future.complete(result); 1829 } 1830 }); 1831 } 1832 }); 1833 return future; 1834 } 1835 1836 @Override 1837 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1838 Map<TableName, List<String>> tableCfs) { 1839 if (tableCfs == null) { 1840 return failedFuture(new ReplicationException("tableCfs is null")); 1841 } 1842 1843 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1844 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1845 if (!completeExceptionally(future, error)) { 1846 ReplicationPeerConfig newPeerConfig = null; 1847 try { 1848 newPeerConfig = ReplicationPeerConfigUtil 1849 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1850 } catch (ReplicationException e) { 1851 future.completeExceptionally(e); 1852 return; 1853 } 1854 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1855 if (!completeExceptionally(future, error)) { 1856 future.complete(result); 1857 } 1858 }); 1859 } 1860 }); 1861 return future; 1862 } 1863 1864 @Override 1865 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1866 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1867 } 1868 1869 @Override 1870 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1871 Preconditions.checkNotNull(pattern, 1872 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1873 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1874 } 1875 1876 private CompletableFuture<List<ReplicationPeerDescription>> 1877 listReplicationPeers(ListReplicationPeersRequest request) { 1878 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1879 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1880 List<ReplicationPeerDescription>> call(controller, stub, request, 1881 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1882 (resp) -> resp.getPeerDescList().stream() 1883 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1884 .collect(Collectors.toList()))) 1885 .call(); 1886 } 1887 1888 @Override 1889 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1890 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1891 addListener(listTableDescriptors(), (tables, error) -> { 1892 if (!completeExceptionally(future, error)) { 1893 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1894 tables.forEach(table -> { 1895 Map<String, Integer> cfs = new HashMap<>(); 1896 Stream.of(table.getColumnFamilies()) 1897 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1898 .forEach(column -> { 1899 cfs.put(column.getNameAsString(), column.getScope()); 1900 }); 1901 if (!cfs.isEmpty()) { 1902 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1903 } 1904 }); 1905 future.complete(replicatedTableCFs); 1906 } 1907 }); 1908 return future; 1909 } 1910 1911 @Override 1912 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1913 SnapshotProtos.SnapshotDescription snapshot = 1914 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1915 try { 1916 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1917 } catch (IllegalArgumentException e) { 1918 return failedFuture(e); 1919 } 1920 CompletableFuture<Void> future = new CompletableFuture<>(); 1921 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 1922 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 1923 addListener(this.<SnapshotResponse> newMasterCaller() 1924 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 1925 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 1926 .call(), (resp, err) -> { 1927 if (err != null) { 1928 future.completeExceptionally(err); 1929 return; 1930 } 1931 waitSnapshotFinish(snapshotDesc, future, resp); 1932 }); 1933 return future; 1934 } 1935 1936 // This is for keeping compatibility with old implementation. 1937 // If there is a procId field in the response, then the snapshot will be operated with a 1938 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 1939 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 1940 SnapshotResponse resp) { 1941 if (resp.hasProcId()) { 1942 getProcedureResult(resp.getProcId(), future, 0); 1943 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 1944 } else { 1945 long expectedTimeout = resp.getExpectedTimeout(); 1946 TimerTask pollingTask = new TimerTask() { 1947 int tries = 0; 1948 long startTime = EnvironmentEdgeManager.currentTime(); 1949 long endTime = startTime + expectedTimeout; 1950 long maxPauseTime = expectedTimeout / maxAttempts; 1951 1952 @Override 1953 public void run(Timeout timeout) throws Exception { 1954 if (EnvironmentEdgeManager.currentTime() < endTime) { 1955 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 1956 if (err2 != null) { 1957 future.completeExceptionally(err2); 1958 } else if (done) { 1959 future.complete(null); 1960 } else { 1961 // retry again after pauseTime. 1962 long pauseTime = 1963 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1964 pauseTime = Math.min(pauseTime, maxPauseTime); 1965 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 1966 } 1967 }); 1968 } else { 1969 future 1970 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 1971 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 1972 } 1973 } 1974 }; 1975 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1976 } 1977 } 1978 1979 @Override 1980 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1981 return this.<Boolean> newMasterCaller() 1982 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 1983 Boolean> call(controller, stub, 1984 IsSnapshotDoneRequest.newBuilder() 1985 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 1986 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 1987 .call(); 1988 } 1989 1990 @Override 1991 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1992 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1993 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1994 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1995 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1996 } 1997 1998 @Override 1999 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2000 boolean restoreAcl) { 2001 CompletableFuture<Void> future = new CompletableFuture<>(); 2002 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2003 if (err != null) { 2004 future.completeExceptionally(err); 2005 return; 2006 } 2007 TableName tableName = null; 2008 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2009 for (SnapshotDescription snap : snapshotDescriptions) { 2010 if (snap.getName().equals(snapshotName)) { 2011 tableName = snap.getTableName(); 2012 break; 2013 } 2014 } 2015 } 2016 if (tableName == null) { 2017 future.completeExceptionally(new RestoreSnapshotException( 2018 "Unable to find the table name for snapshot=" + snapshotName)); 2019 return; 2020 } 2021 final TableName finalTableName = tableName; 2022 addListener(tableExists(finalTableName), (exists, err2) -> { 2023 if (err2 != null) { 2024 future.completeExceptionally(err2); 2025 } else if (!exists) { 2026 // if table does not exist, then just clone snapshot into new table. 2027 completeConditionalOnFuture(future, 2028 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2029 } else { 2030 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2031 if (err4 != null) { 2032 future.completeExceptionally(err4); 2033 } else if (!disabled) { 2034 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2035 } else { 2036 completeConditionalOnFuture(future, 2037 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2038 } 2039 }); 2040 } 2041 }); 2042 }); 2043 return future; 2044 } 2045 2046 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2047 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2048 if (takeFailSafeSnapshot) { 2049 CompletableFuture<Void> future = new CompletableFuture<>(); 2050 // Step.1 Take a snapshot of the current state 2051 String failSafeSnapshotSnapshotNameFormat = 2052 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2053 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2054 final String failSafeSnapshotSnapshotName = 2055 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2056 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2057 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2058 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2059 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2060 if (err != null) { 2061 future.completeExceptionally(err); 2062 } else { 2063 // Step.2 Restore snapshot 2064 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2065 (void2, err2) -> { 2066 if (err2 != null) { 2067 // Step.3.a Something went wrong during the restore and try to rollback. 2068 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2069 restoreAcl, null), (void3, err3) -> { 2070 if (err3 != null) { 2071 future.completeExceptionally(err3); 2072 } else { 2073 String msg = 2074 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" 2075 + failSafeSnapshotSnapshotName + " succeeded."; 2076 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2077 } 2078 }); 2079 } else { 2080 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2081 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2082 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2083 if (err3 != null) { 2084 LOG.error( 2085 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2086 err3); 2087 } 2088 future.complete(ret3); 2089 }); 2090 } 2091 }); 2092 } 2093 }); 2094 return future; 2095 } else { 2096 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2097 } 2098 } 2099 2100 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2101 CompletableFuture<T> parentFuture) { 2102 addListener(parentFuture, (res, err) -> { 2103 if (err != null) { 2104 dependentFuture.completeExceptionally(err); 2105 } else { 2106 dependentFuture.complete(res); 2107 } 2108 }); 2109 } 2110 2111 @Override 2112 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2113 boolean restoreAcl, String customSFT) { 2114 CompletableFuture<Void> future = new CompletableFuture<>(); 2115 addListener(tableExists(tableName), (exists, err) -> { 2116 if (err != null) { 2117 future.completeExceptionally(err); 2118 } else if (exists) { 2119 future.completeExceptionally(new TableExistsException(tableName)); 2120 } else { 2121 completeConditionalOnFuture(future, 2122 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2123 } 2124 }); 2125 return future; 2126 } 2127 2128 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2129 boolean restoreAcl, String customSFT) { 2130 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2131 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2132 try { 2133 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2134 } catch (IllegalArgumentException e) { 2135 return failedFuture(e); 2136 } 2137 RestoreSnapshotRequest.Builder builder = 2138 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2139 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2140 if (customSFT != null) { 2141 builder.setCustomSFT(customSFT); 2142 } 2143 return waitProcedureResult(this.<Long> newMasterCaller() 2144 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2145 Long> call(controller, stub, builder.build(), 2146 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2147 .call()); 2148 } 2149 2150 @Override 2151 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2152 return getCompletedSnapshots(null); 2153 } 2154 2155 @Override 2156 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2157 Preconditions.checkNotNull(pattern, 2158 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2159 return getCompletedSnapshots(pattern); 2160 } 2161 2162 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2163 return this.<List<SnapshotDescription>> newMasterCaller() 2164 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2165 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2166 GetCompletedSnapshotsRequest.newBuilder().build(), 2167 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2168 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2169 .call(); 2170 } 2171 2172 @Override 2173 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2174 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2175 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2176 return getCompletedSnapshots(tableNamePattern, null); 2177 } 2178 2179 @Override 2180 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2181 Pattern snapshotNamePattern) { 2182 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2183 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2184 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2185 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2186 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2187 } 2188 2189 private CompletableFuture<List<SnapshotDescription>> 2190 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2191 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2192 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2193 if (err != null) { 2194 future.completeExceptionally(err); 2195 return; 2196 } 2197 if (tableNames == null || tableNames.size() <= 0) { 2198 future.complete(Collections.emptyList()); 2199 return; 2200 } 2201 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2202 if (err2 != null) { 2203 future.completeExceptionally(err2); 2204 return; 2205 } 2206 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2207 future.complete(Collections.emptyList()); 2208 return; 2209 } 2210 future.complete(snapshotDescList.stream() 2211 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2212 .collect(Collectors.toList())); 2213 }); 2214 }); 2215 return future; 2216 } 2217 2218 @Override 2219 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2220 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2221 } 2222 2223 @Override 2224 public CompletableFuture<Void> deleteSnapshots() { 2225 return internalDeleteSnapshots(null, null); 2226 } 2227 2228 @Override 2229 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2230 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2231 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2232 return internalDeleteSnapshots(null, snapshotNamePattern); 2233 } 2234 2235 @Override 2236 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2237 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2238 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2239 return internalDeleteSnapshots(tableNamePattern, null); 2240 } 2241 2242 @Override 2243 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2244 Pattern snapshotNamePattern) { 2245 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2246 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2247 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2248 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2249 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2250 } 2251 2252 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2253 Pattern snapshotNamePattern) { 2254 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2255 if (tableNamePattern == null) { 2256 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2257 } else { 2258 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2259 } 2260 CompletableFuture<Void> future = new CompletableFuture<>(); 2261 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2262 if (err != null) { 2263 future.completeExceptionally(err); 2264 return; 2265 } 2266 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2267 future.complete(null); 2268 return; 2269 } 2270 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2271 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2272 if (e != null) { 2273 future.completeExceptionally(e); 2274 } else { 2275 future.complete(v); 2276 } 2277 }); 2278 }); 2279 return future; 2280 } 2281 2282 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2283 return this.<Void> newMasterCaller() 2284 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2285 controller, stub, 2286 DeleteSnapshotRequest.newBuilder() 2287 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2288 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2289 .call(); 2290 } 2291 2292 @Override 2293 public CompletableFuture<Void> execProcedure(String signature, String instance, 2294 Map<String, String> props) { 2295 CompletableFuture<Void> future = new CompletableFuture<>(); 2296 ProcedureDescription procDesc = 2297 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2298 addListener(this.<Long> newMasterCaller() 2299 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2300 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2301 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2302 .call(), (expectedTimeout, err) -> { 2303 if (err != null) { 2304 future.completeExceptionally(err); 2305 return; 2306 } 2307 TimerTask pollingTask = new TimerTask() { 2308 int tries = 0; 2309 long startTime = EnvironmentEdgeManager.currentTime(); 2310 long endTime = startTime + expectedTimeout; 2311 long maxPauseTime = expectedTimeout / maxAttempts; 2312 2313 @Override 2314 public void run(Timeout timeout) throws Exception { 2315 if (EnvironmentEdgeManager.currentTime() < endTime) { 2316 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2317 if (err2 != null) { 2318 future.completeExceptionally(err2); 2319 return; 2320 } 2321 if (done) { 2322 future.complete(null); 2323 } else { 2324 // retry again after pauseTime. 2325 long pauseTime = 2326 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2327 pauseTime = Math.min(pauseTime, maxPauseTime); 2328 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2329 TimeUnit.MICROSECONDS); 2330 } 2331 }); 2332 } else { 2333 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2334 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2335 } 2336 } 2337 }; 2338 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2339 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2340 }); 2341 return future; 2342 } 2343 2344 @Override 2345 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2346 Map<String, String> props) { 2347 ProcedureDescription proDesc = 2348 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2349 return this.<byte[]> newMasterCaller() 2350 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2351 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2352 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2353 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2354 .call(); 2355 } 2356 2357 @Override 2358 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2359 Map<String, String> props) { 2360 ProcedureDescription proDesc = 2361 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2362 return this.<Boolean> newMasterCaller() 2363 .action( 2364 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2365 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2366 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2367 .call(); 2368 } 2369 2370 @Override 2371 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2372 return this.<Boolean> newMasterCaller().action( 2373 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2374 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2375 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2376 .call(); 2377 } 2378 2379 @Override 2380 public CompletableFuture<String> getProcedures() { 2381 return this.<String> newMasterCaller() 2382 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2383 controller, stub, GetProceduresRequest.newBuilder().build(), 2384 (s, c, req, done) -> s.getProcedures(c, req, done), 2385 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2386 .call(); 2387 } 2388 2389 @Override 2390 public CompletableFuture<String> getLocks() { 2391 return this.<String> newMasterCaller() 2392 .action( 2393 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2394 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2395 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2396 .call(); 2397 } 2398 2399 @Override 2400 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2401 boolean offload) { 2402 return this.<Void> newMasterCaller() 2403 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2404 DecommissionRegionServersResponse, Void> call(controller, stub, 2405 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2406 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2407 .call(); 2408 } 2409 2410 @Override 2411 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2412 return this.<List<ServerName>> newMasterCaller() 2413 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2414 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2415 ListDecommissionedRegionServersRequest.newBuilder().build(), 2416 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2417 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2418 .collect(Collectors.toList()))) 2419 .call(); 2420 } 2421 2422 @Override 2423 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2424 List<byte[]> encodedRegionNames) { 2425 return this.<Void> newMasterCaller() 2426 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2427 RecommissionRegionServerResponse, Void> call(controller, stub, 2428 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2429 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2430 .call(); 2431 } 2432 2433 /** 2434 * Get the region location for the passed region name. The region name may be a full region name 2435 * or encoded region name. If the region does not found, then it'll throw an 2436 * UnknownRegionException wrapped by a {@link CompletableFuture} 2437 * @param regionNameOrEncodedRegionName region name or encoded region name 2438 * @return region location, wrapped by a {@link CompletableFuture} 2439 */ 2440 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2441 if (regionNameOrEncodedRegionName == null) { 2442 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2443 } 2444 2445 CompletableFuture<Optional<HRegionLocation>> future; 2446 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2447 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2448 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2449 // old format encodedName, should be meta region 2450 future = connection.registry.getMetaRegionLocations() 2451 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2452 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2453 } else { 2454 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2455 regionNameOrEncodedRegionName); 2456 } 2457 } else { 2458 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2459 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2460 RegionInfo regionInfo; 2461 try { 2462 regionInfo = 2463 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2464 } catch (IOException ioe) { 2465 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2466 } 2467 2468 if (regionInfo.isMetaRegion()) { 2469 future = connection.registry.getMetaRegionLocations() 2470 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2471 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2472 .findFirst()); 2473 } else { 2474 future = 2475 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2476 } 2477 } 2478 2479 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2480 addListener(future, (location, err) -> { 2481 if (err != null) { 2482 returnedFuture.completeExceptionally(err); 2483 return; 2484 } 2485 if (!location.isPresent() || location.get().getRegion() == null) { 2486 returnedFuture.completeExceptionally( 2487 new UnknownRegionException("Invalid region name or encoded region name: " 2488 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2489 } else { 2490 returnedFuture.complete(location.get()); 2491 } 2492 }); 2493 return returnedFuture; 2494 } 2495 2496 /** 2497 * Get the region info for the passed region name. The region name may be a full region name or 2498 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2499 * wrapped by a {@link CompletableFuture} 2500 * @return region info, wrapped by a {@link CompletableFuture} 2501 */ 2502 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2503 if (regionNameOrEncodedRegionName == null) { 2504 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2505 } 2506 2507 if ( 2508 Bytes.equals(regionNameOrEncodedRegionName, 2509 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2510 || Bytes.equals(regionNameOrEncodedRegionName, 2511 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2512 ) { 2513 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2514 } 2515 2516 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2517 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2518 if (err != null) { 2519 future.completeExceptionally(err); 2520 } else { 2521 future.complete(location.getRegion()); 2522 } 2523 }); 2524 return future; 2525 } 2526 2527 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2528 if (numRegions < 3) { 2529 throw new IllegalArgumentException("Must create at least three regions"); 2530 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2531 throw new IllegalArgumentException("Start key must be smaller than end key"); 2532 } 2533 if (numRegions == 3) { 2534 return new byte[][] { startKey, endKey }; 2535 } 2536 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2537 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2538 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2539 } 2540 return splitKeys; 2541 } 2542 2543 private void verifySplitKeys(byte[][] splitKeys) { 2544 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2545 // Verify there are no duplicate split keys 2546 byte[] lastKey = null; 2547 for (byte[] splitKey : splitKeys) { 2548 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2549 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2550 } 2551 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2552 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2553 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2554 } 2555 lastKey = splitKey; 2556 } 2557 } 2558 2559 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2560 2561 abstract void onFinished(); 2562 2563 abstract void onError(Throwable error); 2564 2565 @Override 2566 public void accept(Void v, Throwable error) { 2567 if (error != null) { 2568 onError(error); 2569 return; 2570 } 2571 onFinished(); 2572 } 2573 } 2574 2575 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2576 protected final TableName tableName; 2577 2578 TableProcedureBiConsumer(TableName tableName) { 2579 this.tableName = tableName; 2580 } 2581 2582 abstract String getOperationType(); 2583 2584 String getDescription() { 2585 return "Operation: " + getOperationType() + ", " + "Table Name: " 2586 + tableName.getNameWithNamespaceInclAsString(); 2587 } 2588 2589 @Override 2590 void onFinished() { 2591 LOG.info(getDescription() + " completed"); 2592 } 2593 2594 @Override 2595 void onError(Throwable error) { 2596 LOG.info(getDescription() + " failed with " + error.getMessage()); 2597 } 2598 } 2599 2600 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2601 protected final String namespaceName; 2602 2603 NamespaceProcedureBiConsumer(String namespaceName) { 2604 this.namespaceName = namespaceName; 2605 } 2606 2607 abstract String getOperationType(); 2608 2609 String getDescription() { 2610 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2611 } 2612 2613 @Override 2614 void onFinished() { 2615 LOG.info(getDescription() + " completed"); 2616 } 2617 2618 @Override 2619 void onError(Throwable error) { 2620 LOG.info(getDescription() + " failed with " + error.getMessage()); 2621 } 2622 } 2623 2624 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2625 2626 CreateTableProcedureBiConsumer(TableName tableName) { 2627 super(tableName); 2628 } 2629 2630 @Override 2631 String getOperationType() { 2632 return "CREATE"; 2633 } 2634 } 2635 2636 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2637 2638 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2639 super(tableName); 2640 } 2641 2642 @Override 2643 String getOperationType() { 2644 return "MODIFY"; 2645 } 2646 } 2647 2648 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2649 extends TableProcedureBiConsumer { 2650 2651 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2652 super(tableName); 2653 } 2654 2655 @Override 2656 String getOperationType() { 2657 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2658 } 2659 } 2660 2661 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2662 2663 DeleteTableProcedureBiConsumer(TableName tableName) { 2664 super(tableName); 2665 } 2666 2667 @Override 2668 String getOperationType() { 2669 return "DELETE"; 2670 } 2671 2672 @Override 2673 void onFinished() { 2674 connection.getLocator().clearCache(this.tableName); 2675 super.onFinished(); 2676 } 2677 } 2678 2679 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2680 2681 TruncateTableProcedureBiConsumer(TableName tableName) { 2682 super(tableName); 2683 } 2684 2685 @Override 2686 String getOperationType() { 2687 return "TRUNCATE"; 2688 } 2689 } 2690 2691 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2692 2693 EnableTableProcedureBiConsumer(TableName tableName) { 2694 super(tableName); 2695 } 2696 2697 @Override 2698 String getOperationType() { 2699 return "ENABLE"; 2700 } 2701 } 2702 2703 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2704 2705 DisableTableProcedureBiConsumer(TableName tableName) { 2706 super(tableName); 2707 } 2708 2709 @Override 2710 String getOperationType() { 2711 return "DISABLE"; 2712 } 2713 } 2714 2715 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2716 2717 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2718 super(tableName); 2719 } 2720 2721 @Override 2722 String getOperationType() { 2723 return "ADD_COLUMN_FAMILY"; 2724 } 2725 } 2726 2727 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2728 2729 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2730 super(tableName); 2731 } 2732 2733 @Override 2734 String getOperationType() { 2735 return "DELETE_COLUMN_FAMILY"; 2736 } 2737 } 2738 2739 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2740 2741 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2742 super(tableName); 2743 } 2744 2745 @Override 2746 String getOperationType() { 2747 return "MODIFY_COLUMN_FAMILY"; 2748 } 2749 } 2750 2751 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2752 extends TableProcedureBiConsumer { 2753 2754 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2755 super(tableName); 2756 } 2757 2758 @Override 2759 String getOperationType() { 2760 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2761 } 2762 } 2763 2764 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2765 2766 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2767 super(namespaceName); 2768 } 2769 2770 @Override 2771 String getOperationType() { 2772 return "CREATE_NAMESPACE"; 2773 } 2774 } 2775 2776 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2777 2778 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2779 super(namespaceName); 2780 } 2781 2782 @Override 2783 String getOperationType() { 2784 return "DELETE_NAMESPACE"; 2785 } 2786 } 2787 2788 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2789 2790 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2791 super(namespaceName); 2792 } 2793 2794 @Override 2795 String getOperationType() { 2796 return "MODIFY_NAMESPACE"; 2797 } 2798 } 2799 2800 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2801 2802 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2803 super(tableName); 2804 } 2805 2806 @Override 2807 String getOperationType() { 2808 return "MERGE_REGIONS"; 2809 } 2810 } 2811 2812 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2813 2814 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2815 super(tableName); 2816 } 2817 2818 @Override 2819 String getOperationType() { 2820 return "SPLIT_REGION"; 2821 } 2822 } 2823 2824 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2825 SnapshotProcedureBiConsumer(TableName tableName) { 2826 super(tableName); 2827 } 2828 2829 @Override 2830 String getOperationType() { 2831 return "SNAPSHOT"; 2832 } 2833 } 2834 2835 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2836 private final String peerId; 2837 private final Supplier<String> getOperation; 2838 2839 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2840 this.peerId = peerId; 2841 this.getOperation = getOperation; 2842 } 2843 2844 String getDescription() { 2845 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2846 } 2847 2848 @Override 2849 void onFinished() { 2850 LOG.info(getDescription() + " completed"); 2851 } 2852 2853 @Override 2854 void onError(Throwable error) { 2855 LOG.info(getDescription() + " failed with " + error.getMessage()); 2856 } 2857 } 2858 2859 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2860 CompletableFuture<Void> future = new CompletableFuture<>(); 2861 addListener(procFuture, (procId, error) -> { 2862 if (error != null) { 2863 future.completeExceptionally(error); 2864 return; 2865 } 2866 getProcedureResult(procId, future, 0); 2867 }); 2868 return future; 2869 } 2870 2871 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2872 addListener( 2873 this.<GetProcedureResultResponse> newMasterCaller() 2874 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 2875 GetProcedureResultResponse> call(controller, stub, 2876 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2877 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2878 .call(), 2879 (response, error) -> { 2880 if (error != null) { 2881 LOG.warn("failed to get the procedure result procId={}", procId, 2882 ConnectionUtils.translateException(error)); 2883 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2884 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2885 return; 2886 } 2887 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2888 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2889 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2890 return; 2891 } 2892 if (response.hasException()) { 2893 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2894 future.completeExceptionally(ioe); 2895 } else { 2896 future.complete(null); 2897 } 2898 }); 2899 } 2900 2901 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2902 CompletableFuture<T> future = new CompletableFuture<>(); 2903 future.completeExceptionally(error); 2904 return future; 2905 } 2906 2907 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2908 if (error != null) { 2909 future.completeExceptionally(error); 2910 return true; 2911 } 2912 return false; 2913 } 2914 2915 @Override 2916 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2917 return getClusterMetrics(EnumSet.allOf(Option.class)); 2918 } 2919 2920 @Override 2921 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2922 return this.<ClusterMetrics> newMasterCaller() 2923 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 2924 ClusterMetrics> call(controller, stub, 2925 RequestConverter.buildGetClusterStatusRequest(options), 2926 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2927 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 2928 .call(); 2929 } 2930 2931 @Override 2932 public CompletableFuture<Void> shutdown() { 2933 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2934 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2935 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 2936 resp -> null)) 2937 .call(); 2938 } 2939 2940 @Override 2941 public CompletableFuture<Void> stopMaster() { 2942 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2943 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 2944 controller, stub, StopMasterRequest.newBuilder().build(), 2945 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 2946 .call(); 2947 } 2948 2949 @Override 2950 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2951 StopServerRequest request = RequestConverter 2952 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 2953 return this.<Void> newAdminCaller().priority(HIGH_QOS) 2954 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2955 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2956 resp -> null)) 2957 .serverName(serverName).call(); 2958 } 2959 2960 @Override 2961 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2962 return this.<Void> newAdminCaller() 2963 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 2964 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2965 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2966 .serverName(serverName).call(); 2967 } 2968 2969 @Override 2970 public CompletableFuture<Void> updateConfiguration() { 2971 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2972 addListener( 2973 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 2974 (status, err) -> { 2975 if (err != null) { 2976 future.completeExceptionally(err); 2977 } else { 2978 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2979 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 2980 futures.add(updateConfiguration(status.getMasterName())); 2981 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2982 addListener( 2983 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2984 (result, err2) -> { 2985 if (err2 != null) { 2986 future.completeExceptionally(err2); 2987 } else { 2988 future.complete(result); 2989 } 2990 }); 2991 } 2992 }); 2993 return future; 2994 } 2995 2996 @Override 2997 public CompletableFuture<Void> updateConfiguration(String groupName) { 2998 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2999 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3000 if (err != null) { 3001 future.completeExceptionally(err); 3002 } else if (rsGroupInfo == null) { 3003 future.completeExceptionally( 3004 new IllegalArgumentException("Group does not exist: " + groupName)); 3005 } else { 3006 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3007 if (err2 != null) { 3008 future.completeExceptionally(err2); 3009 } else { 3010 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3011 List<ServerName> groupServers = status.getServersName().stream() 3012 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3013 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3014 addListener( 3015 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3016 (result, err3) -> { 3017 if (err3 != null) { 3018 future.completeExceptionally(err3); 3019 } else { 3020 future.complete(result); 3021 } 3022 }); 3023 } 3024 }); 3025 } 3026 }); 3027 return future; 3028 } 3029 3030 @Override 3031 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3032 return this.<Void> newAdminCaller() 3033 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3034 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3035 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3036 .serverName(serverName).call(); 3037 } 3038 3039 @Override 3040 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3041 return this.<Void> newAdminCaller() 3042 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3043 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3044 RequestConverter.buildClearCompactionQueuesRequest(queues), 3045 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3046 .serverName(serverName).call(); 3047 } 3048 3049 @Override 3050 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3051 return this.<List<SecurityCapability>> newMasterCaller() 3052 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3053 List<SecurityCapability>> call(controller, stub, 3054 SecurityCapabilitiesRequest.newBuilder().build(), 3055 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3056 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3057 .call(); 3058 } 3059 3060 @Override 3061 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3062 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3063 } 3064 3065 @Override 3066 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3067 TableName tableName) { 3068 Preconditions.checkNotNull(tableName, 3069 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3070 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3071 } 3072 3073 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3074 ServerName serverName) { 3075 return this.<List<RegionMetrics>> newAdminCaller() 3076 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3077 List<RegionMetrics>> adminCall(controller, stub, request, 3078 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3079 RegionMetricsBuilder::toRegionMetrics)) 3080 .serverName(serverName).call(); 3081 } 3082 3083 @Override 3084 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3085 return this.<Boolean> newMasterCaller() 3086 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3087 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3088 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3089 resp -> resp.getInMaintenanceMode())) 3090 .call(); 3091 } 3092 3093 @Override 3094 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3095 CompactType compactType) { 3096 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3097 3098 switch (compactType) { 3099 case MOB: 3100 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3101 if (err != null) { 3102 future.completeExceptionally(err); 3103 return; 3104 } 3105 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3106 3107 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3108 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3109 GetRegionInfoResponse> adminCall(controller, stub, 3110 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3111 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3112 .call(), (resp2, err2) -> { 3113 if (err2 != null) { 3114 future.completeExceptionally(err2); 3115 } else { 3116 if (resp2.hasCompactionState()) { 3117 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3118 } else { 3119 future.complete(CompactionState.NONE); 3120 } 3121 } 3122 }); 3123 }); 3124 break; 3125 case NORMAL: 3126 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3127 if (err != null) { 3128 future.completeExceptionally(err); 3129 return; 3130 } 3131 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3132 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3133 locations.stream().filter(loc -> loc.getServerName() != null) 3134 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3135 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3136 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3137 // If any region compaction state is MAJOR_AND_MINOR 3138 // the table compaction state is MAJOR_AND_MINOR, too. 3139 if (err2 != null) { 3140 future.completeExceptionally(unwrapCompletionException(err2)); 3141 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3142 future.complete(regionState); 3143 } else { 3144 regionStates.add(regionState); 3145 } 3146 })); 3147 }); 3148 addListener( 3149 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3150 (ret, err3) -> { 3151 // If future not completed, check all regions's compaction state 3152 if (!future.isCompletedExceptionally() && !future.isDone()) { 3153 CompactionState state = CompactionState.NONE; 3154 for (CompactionState regionState : regionStates) { 3155 switch (regionState) { 3156 case MAJOR: 3157 if (state == CompactionState.MINOR) { 3158 future.complete(CompactionState.MAJOR_AND_MINOR); 3159 } else { 3160 state = CompactionState.MAJOR; 3161 } 3162 break; 3163 case MINOR: 3164 if (state == CompactionState.MAJOR) { 3165 future.complete(CompactionState.MAJOR_AND_MINOR); 3166 } else { 3167 state = CompactionState.MINOR; 3168 } 3169 break; 3170 case NONE: 3171 default: 3172 } 3173 } 3174 if (!future.isDone()) { 3175 future.complete(state); 3176 } 3177 } 3178 }); 3179 }); 3180 break; 3181 default: 3182 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3183 } 3184 3185 return future; 3186 } 3187 3188 @Override 3189 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3190 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3191 addListener(getRegionLocation(regionName), (location, err) -> { 3192 if (err != null) { 3193 future.completeExceptionally(err); 3194 return; 3195 } 3196 ServerName serverName = location.getServerName(); 3197 if (serverName == null) { 3198 future 3199 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3200 return; 3201 } 3202 addListener( 3203 this.<GetRegionInfoResponse> newAdminCaller() 3204 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3205 GetRegionInfoResponse> adminCall(controller, stub, 3206 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3207 true), 3208 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3209 .serverName(serverName).call(), 3210 (resp2, err2) -> { 3211 if (err2 != null) { 3212 future.completeExceptionally(err2); 3213 } else { 3214 if (resp2.hasCompactionState()) { 3215 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3216 } else { 3217 future.complete(CompactionState.NONE); 3218 } 3219 } 3220 }); 3221 }); 3222 return future; 3223 } 3224 3225 @Override 3226 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3227 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3228 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3229 return this.<Optional<Long>> newMasterCaller() 3230 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3231 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3232 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3233 ProtobufUtil::toOptionalTimestamp)) 3234 .call(); 3235 } 3236 3237 @Override 3238 public CompletableFuture<Optional<Long>> 3239 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3240 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3241 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3242 addListener(getRegionInfo(regionName), (region, err) -> { 3243 if (err != null) { 3244 future.completeExceptionally(err); 3245 return; 3246 } 3247 MajorCompactionTimestampForRegionRequest.Builder builder = 3248 MajorCompactionTimestampForRegionRequest.newBuilder(); 3249 builder.setRegion( 3250 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3251 addListener(this.<Optional<Long>> newMasterCaller() 3252 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3253 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3254 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3255 ProtobufUtil::toOptionalTimestamp)) 3256 .call(), (timestamp, err2) -> { 3257 if (err2 != null) { 3258 future.completeExceptionally(err2); 3259 } else { 3260 future.complete(timestamp); 3261 } 3262 }); 3263 }); 3264 return future; 3265 } 3266 3267 @Override 3268 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3269 List<String> serverNamesList) { 3270 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3271 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3272 if (err != null) { 3273 future.completeExceptionally(err); 3274 return; 3275 } 3276 // Accessed by multiple threads. 3277 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3278 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3279 serverNames.stream().forEach(serverName -> { 3280 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3281 if (err2 != null) { 3282 future.completeExceptionally(unwrapCompletionException(err2)); 3283 } else { 3284 serverStates.put(serverName, serverState); 3285 } 3286 })); 3287 }); 3288 addListener( 3289 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3290 (ret, err3) -> { 3291 if (!future.isCompletedExceptionally()) { 3292 if (err3 != null) { 3293 future.completeExceptionally(err3); 3294 } else { 3295 future.complete(serverStates); 3296 } 3297 } 3298 }); 3299 }); 3300 return future; 3301 } 3302 3303 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3304 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3305 if (serverNamesList.isEmpty()) { 3306 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3307 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3308 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3309 if (err != null) { 3310 future.completeExceptionally(err); 3311 } else { 3312 future.complete(clusterMetrics.getServersName()); 3313 } 3314 }); 3315 return future; 3316 } else { 3317 List<ServerName> serverList = new ArrayList<>(); 3318 for (String regionServerName : serverNamesList) { 3319 ServerName serverName = null; 3320 try { 3321 serverName = ServerName.valueOf(regionServerName); 3322 } catch (Exception e) { 3323 future.completeExceptionally( 3324 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3325 } 3326 if (serverName == null) { 3327 future.completeExceptionally( 3328 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3329 } else { 3330 serverList.add(serverName); 3331 } 3332 } 3333 future.complete(serverList); 3334 } 3335 return future; 3336 } 3337 3338 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3339 return this.<Boolean> newAdminCaller().serverName(serverName) 3340 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3341 Boolean> adminCall(controller, stub, 3342 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3343 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3344 .call(); 3345 } 3346 3347 @Override 3348 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3349 return this.<Boolean> newMasterCaller() 3350 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3351 Boolean> call(controller, stub, 3352 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3353 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3354 (resp) -> resp.getPrevBalanceValue())) 3355 .call(); 3356 } 3357 3358 @Override 3359 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3360 return this.<BalanceResponse> newMasterCaller() 3361 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3362 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3363 (s, c, req, done) -> s.balance(c, req, done), 3364 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3365 .call(); 3366 } 3367 3368 @Override 3369 public CompletableFuture<Boolean> isBalancerEnabled() { 3370 return this.<Boolean> newMasterCaller() 3371 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3372 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3373 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3374 .call(); 3375 } 3376 3377 @Override 3378 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3379 return this.<Boolean> newMasterCaller() 3380 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3381 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3382 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3383 (resp) -> resp.getPrevNormalizerValue())) 3384 .call(); 3385 } 3386 3387 @Override 3388 public CompletableFuture<Boolean> isNormalizerEnabled() { 3389 return this.<Boolean> newMasterCaller() 3390 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3391 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3392 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3393 .call(); 3394 } 3395 3396 @Override 3397 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3398 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3399 } 3400 3401 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3402 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3403 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3404 } 3405 3406 @Override 3407 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3408 return this.<Boolean> newMasterCaller() 3409 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3410 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3411 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3412 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3413 (resp) -> resp.getPrevValue())) 3414 .call(); 3415 } 3416 3417 @Override 3418 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3419 return this.<Boolean> newMasterCaller() 3420 .action( 3421 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3422 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3423 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3424 .call(); 3425 } 3426 3427 @Override 3428 public CompletableFuture<Boolean> runCleanerChore() { 3429 return this.<Boolean> newMasterCaller() 3430 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3431 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3432 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3433 (resp) -> resp.getCleanerChoreRan())) 3434 .call(); 3435 } 3436 3437 @Override 3438 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3439 return this.<Boolean> newMasterCaller() 3440 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3441 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3442 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3443 .call(); 3444 } 3445 3446 @Override 3447 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3448 return this.<Boolean> newMasterCaller() 3449 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3450 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3451 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3452 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3453 .call(); 3454 } 3455 3456 @Override 3457 public CompletableFuture<Integer> runCatalogJanitor() { 3458 return this.<Integer> newMasterCaller() 3459 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3460 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3461 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3462 .call(); 3463 } 3464 3465 @Override 3466 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3467 ServiceCaller<S, R> callable) { 3468 MasterCoprocessorRpcChannelImpl channel = 3469 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3470 S stub = stubMaker.apply(channel); 3471 CompletableFuture<R> future = new CompletableFuture<>(); 3472 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3473 callable.call(stub, controller, resp -> { 3474 if (controller.failed()) { 3475 future.completeExceptionally(controller.getFailed()); 3476 } else { 3477 future.complete(resp); 3478 } 3479 }); 3480 return future; 3481 } 3482 3483 @Override 3484 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3485 ServiceCaller<S, R> callable, ServerName serverName) { 3486 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3487 this.<Message> newServerCaller().serverName(serverName)); 3488 S stub = stubMaker.apply(channel); 3489 CompletableFuture<R> future = new CompletableFuture<>(); 3490 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3491 callable.call(stub, controller, resp -> { 3492 if (controller.failed()) { 3493 future.completeExceptionally(controller.getFailed()); 3494 } else { 3495 future.complete(resp); 3496 } 3497 }); 3498 return future; 3499 } 3500 3501 @Override 3502 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3503 return this.<List<ServerName>> newMasterCaller() 3504 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3505 List<ServerName>> call(controller, stub, 3506 RequestConverter.buildClearDeadServersRequest(servers), 3507 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3508 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3509 .call(); 3510 } 3511 3512 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3513 return this.connection.callerFactory.<T> serverRequest() 3514 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3515 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3516 .pause(pauseNs, TimeUnit.NANOSECONDS) 3517 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3518 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3519 } 3520 3521 @Override 3522 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3523 if (tableName == null) { 3524 return failedFuture(new IllegalArgumentException("Table name is null")); 3525 } 3526 CompletableFuture<Void> future = new CompletableFuture<>(); 3527 addListener(tableExists(tableName), (exist, err) -> { 3528 if (err != null) { 3529 future.completeExceptionally(err); 3530 return; 3531 } 3532 if (!exist) { 3533 future.completeExceptionally(new TableNotFoundException( 3534 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3535 return; 3536 } 3537 addListener(getTableSplits(tableName), (splits, err1) -> { 3538 if (err1 != null) { 3539 future.completeExceptionally(err1); 3540 } else { 3541 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3542 if (err2 != null) { 3543 future.completeExceptionally(err2); 3544 } else { 3545 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3546 if (err3 != null) { 3547 future.completeExceptionally(err3); 3548 } else { 3549 future.complete(result3); 3550 } 3551 }); 3552 } 3553 }); 3554 } 3555 }); 3556 }); 3557 return future; 3558 } 3559 3560 @Override 3561 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3562 if (tableName == null) { 3563 return failedFuture(new IllegalArgumentException("Table name is null")); 3564 } 3565 CompletableFuture<Void> future = new CompletableFuture<>(); 3566 addListener(tableExists(tableName), (exist, err) -> { 3567 if (err != null) { 3568 future.completeExceptionally(err); 3569 return; 3570 } 3571 if (!exist) { 3572 future.completeExceptionally(new TableNotFoundException( 3573 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3574 return; 3575 } 3576 addListener(setTableReplication(tableName, false), (result, err2) -> { 3577 if (err2 != null) { 3578 future.completeExceptionally(err2); 3579 } else { 3580 future.complete(result); 3581 } 3582 }); 3583 }); 3584 return future; 3585 } 3586 3587 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3588 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3589 addListener( 3590 getRegions(tableName).thenApply(regions -> regions.stream() 3591 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3592 (regions, err2) -> { 3593 if (err2 != null) { 3594 future.completeExceptionally(err2); 3595 return; 3596 } 3597 if (regions.size() == 1) { 3598 future.complete(null); 3599 } else { 3600 byte[][] splits = new byte[regions.size() - 1][]; 3601 for (int i = 1; i < regions.size(); i++) { 3602 splits[i - 1] = regions.get(i).getStartKey(); 3603 } 3604 future.complete(splits); 3605 } 3606 }); 3607 return future; 3608 } 3609 3610 /** 3611 * Connect to peer and check the table descriptor on peer: 3612 * <ol> 3613 * <li>Create the same table on peer when not exist.</li> 3614 * <li>Throw an exception if the table already has replication enabled on any of the column 3615 * families.</li> 3616 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3617 * </ol> 3618 * @param tableName name of the table to sync to the peer 3619 * @param splits table split keys 3620 */ 3621 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3622 byte[][] splits) { 3623 CompletableFuture<Void> future = new CompletableFuture<>(); 3624 addListener(listReplicationPeers(), (peers, err) -> { 3625 if (err != null) { 3626 future.completeExceptionally(err); 3627 return; 3628 } 3629 if (peers == null || peers.size() <= 0) { 3630 future.completeExceptionally( 3631 new IllegalArgumentException("Found no peer cluster for replication.")); 3632 return; 3633 } 3634 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3635 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3636 .forEach(peer -> { 3637 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3638 }); 3639 addListener( 3640 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3641 (result, err2) -> { 3642 if (err2 != null) { 3643 future.completeExceptionally(err2); 3644 } else { 3645 future.complete(result); 3646 } 3647 }); 3648 }); 3649 return future; 3650 } 3651 3652 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3653 ReplicationPeerDescription peer) { 3654 Configuration peerConf = null; 3655 try { 3656 peerConf = 3657 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3658 } catch (IOException e) { 3659 return failedFuture(e); 3660 } 3661 CompletableFuture<Void> future = new CompletableFuture<>(); 3662 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3663 if (err != null) { 3664 future.completeExceptionally(err); 3665 return; 3666 } 3667 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3668 if (err1 != null) { 3669 future.completeExceptionally(err1); 3670 return; 3671 } 3672 AsyncAdmin peerAdmin = conn.getAdmin(); 3673 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3674 if (err2 != null) { 3675 future.completeExceptionally(err2); 3676 return; 3677 } 3678 if (!exist) { 3679 CompletableFuture<Void> createTableFuture = null; 3680 if (splits == null) { 3681 createTableFuture = peerAdmin.createTable(tableDesc); 3682 } else { 3683 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3684 } 3685 addListener(createTableFuture, (result, err3) -> { 3686 if (err3 != null) { 3687 future.completeExceptionally(err3); 3688 } else { 3689 future.complete(result); 3690 } 3691 }); 3692 } else { 3693 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3694 (result, err4) -> { 3695 if (err4 != null) { 3696 future.completeExceptionally(err4); 3697 } else { 3698 future.complete(result); 3699 } 3700 }); 3701 } 3702 }); 3703 }); 3704 }); 3705 return future; 3706 } 3707 3708 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3709 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3710 CompletableFuture<Void> future = new CompletableFuture<>(); 3711 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3712 if (err != null) { 3713 future.completeExceptionally(err); 3714 return; 3715 } 3716 if (peerTableDesc == null) { 3717 future.completeExceptionally( 3718 new IllegalArgumentException("Failed to get table descriptor for table " 3719 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3720 return; 3721 } 3722 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3723 future.completeExceptionally(new IllegalArgumentException( 3724 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3725 + ", but the table descriptors are not same when compared with source cluster." 3726 + " Thus can not enable the table's replication switch.")); 3727 return; 3728 } 3729 future.complete(null); 3730 }); 3731 return future; 3732 } 3733 3734 /** 3735 * Set the table's replication switch if the table's replication switch is already not set. 3736 * @param tableName name of the table 3737 * @param enableRep is replication switch enable or disable 3738 */ 3739 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3740 CompletableFuture<Void> future = new CompletableFuture<>(); 3741 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3742 if (err != null) { 3743 future.completeExceptionally(err); 3744 return; 3745 } 3746 if (!tableDesc.matchReplicationScope(enableRep)) { 3747 int scope = 3748 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3749 TableDescriptor newTableDesc = 3750 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3751 addListener(modifyTable(newTableDesc), (result, err2) -> { 3752 if (err2 != null) { 3753 future.completeExceptionally(err2); 3754 } else { 3755 future.complete(result); 3756 } 3757 }); 3758 } else { 3759 future.complete(null); 3760 } 3761 }); 3762 return future; 3763 } 3764 3765 @Override 3766 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 3767 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 3768 request.setPeerId(peerId); 3769 return this.<Boolean> newMasterCaller() 3770 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 3771 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 3772 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 3773 resp -> resp.getIsEnabled())) 3774 .call(); 3775 } 3776 3777 @Override 3778 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3779 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3780 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3781 if (err != null) { 3782 future.completeExceptionally(err); 3783 return; 3784 } 3785 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3786 locations.stream().filter(l -> l.getRegion() != null) 3787 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3788 .collect(Collectors.groupingBy(l -> l.getServerName(), 3789 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3790 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3791 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3792 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3793 futures 3794 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3795 if (err2 != null) { 3796 future.completeExceptionally(unwrapCompletionException(err2)); 3797 } else { 3798 aggregator.append(stats); 3799 } 3800 })); 3801 } 3802 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3803 (ret, err3) -> { 3804 if (err3 != null) { 3805 future.completeExceptionally(unwrapCompletionException(err3)); 3806 } else { 3807 future.complete(aggregator.sum()); 3808 } 3809 }); 3810 }); 3811 return future; 3812 } 3813 3814 @Override 3815 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3816 boolean preserveSplits) { 3817 CompletableFuture<Void> future = new CompletableFuture<>(); 3818 addListener(tableExists(tableName), (exist, err) -> { 3819 if (err != null) { 3820 future.completeExceptionally(err); 3821 return; 3822 } 3823 if (!exist) { 3824 future.completeExceptionally(new TableNotFoundException(tableName)); 3825 return; 3826 } 3827 addListener(tableExists(newTableName), (exist1, err1) -> { 3828 if (err1 != null) { 3829 future.completeExceptionally(err1); 3830 return; 3831 } 3832 if (exist1) { 3833 future.completeExceptionally(new TableExistsException(newTableName)); 3834 return; 3835 } 3836 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3837 if (err2 != null) { 3838 future.completeExceptionally(err2); 3839 return; 3840 } 3841 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3842 if (preserveSplits) { 3843 addListener(getTableSplits(tableName), (splits, err3) -> { 3844 if (err3 != null) { 3845 future.completeExceptionally(err3); 3846 } else { 3847 addListener( 3848 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3849 (result, err4) -> { 3850 if (err4 != null) { 3851 future.completeExceptionally(err4); 3852 } else { 3853 future.complete(result); 3854 } 3855 }); 3856 } 3857 }); 3858 } else { 3859 addListener(createTable(newTableDesc), (result, err5) -> { 3860 if (err5 != null) { 3861 future.completeExceptionally(err5); 3862 } else { 3863 future.complete(result); 3864 } 3865 }); 3866 } 3867 }); 3868 }); 3869 }); 3870 return future; 3871 } 3872 3873 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3874 List<RegionInfo> hris) { 3875 return this.<CacheEvictionStats> newAdminCaller() 3876 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 3877 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 3878 RequestConverter.buildClearRegionBlockCacheRequest(hris), 3879 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3880 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3881 .serverName(serverName).call(); 3882 } 3883 3884 @Override 3885 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 3886 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3887 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 3888 Boolean> call(controller, stub, 3889 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 3890 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 3891 resp -> resp.getPreviousRpcThrottleEnabled())) 3892 .call(); 3893 return future; 3894 } 3895 3896 @Override 3897 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 3898 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3899 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 3900 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 3901 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 3902 resp -> resp.getRpcThrottleEnabled())) 3903 .call(); 3904 return future; 3905 } 3906 3907 @Override 3908 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 3909 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3910 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 3911 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 3912 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 3913 .build(), 3914 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 3915 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 3916 .call(); 3917 return future; 3918 } 3919 3920 @Override 3921 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 3922 return this.<Map<TableName, Long>> newMasterCaller() 3923 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 3924 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 3925 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 3926 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 3927 resp -> resp.getSizesList().stream().collect(Collectors 3928 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 3929 .call(); 3930 } 3931 3932 @Override 3933 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 3934 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 3935 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 3936 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 3937 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 3938 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 3939 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 3940 resp -> resp.getSnapshotsList().stream() 3941 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 3942 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 3943 .serverName(serverName).call(); 3944 } 3945 3946 private CompletableFuture<SpaceQuotaSnapshot> 3947 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 3948 return this.<SpaceQuotaSnapshot> newMasterCaller() 3949 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 3950 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 3951 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 3952 .call(); 3953 } 3954 3955 @Override 3956 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 3957 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 3958 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 3959 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3960 } 3961 3962 @Override 3963 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 3964 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 3965 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 3966 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 3967 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3968 } 3969 3970 @Override 3971 public CompletableFuture<Void> grant(UserPermission userPermission, 3972 boolean mergeExistingPermissions) { 3973 return this.<Void> newMasterCaller() 3974 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 3975 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 3976 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 3977 .call(); 3978 } 3979 3980 @Override 3981 public CompletableFuture<Void> revoke(UserPermission userPermission) { 3982 return this.<Void> newMasterCaller() 3983 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 3984 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 3985 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 3986 .call(); 3987 } 3988 3989 @Override 3990 public CompletableFuture<List<UserPermission>> 3991 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 3992 return this.<List<UserPermission>> newMasterCaller() 3993 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 3994 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 3995 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 3996 (s, c, req, done) -> s.getUserPermissions(c, req, done), 3997 resp -> resp.getUserPermissionList().stream() 3998 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 3999 .collect(Collectors.toList()))) 4000 .call(); 4001 } 4002 4003 @Override 4004 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4005 List<Permission> permissions) { 4006 return this.<List<Boolean>> newMasterCaller() 4007 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4008 List<Boolean>> call(controller, stub, 4009 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4010 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4011 resp -> resp.getHasUserPermissionList())) 4012 .call(); 4013 } 4014 4015 @Override 4016 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4017 return this.<Boolean> newMasterCaller() 4018 .action((controller, stub) -> this.call(controller, stub, 4019 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4020 MasterService.Interface::switchSnapshotCleanup, 4021 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4022 .call(); 4023 } 4024 4025 @Override 4026 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4027 return this.<Boolean> newMasterCaller() 4028 .action((controller, stub) -> this.call(controller, stub, 4029 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4030 MasterService.Interface::isSnapshotCleanupEnabled, 4031 IsSnapshotCleanupEnabledResponse::getEnabled)) 4032 .call(); 4033 } 4034 4035 @Override 4036 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4037 return this.<Void> newMasterCaller() 4038 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4039 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4040 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4041 .call(); 4042 } 4043 4044 @Override 4045 public CompletableFuture<Void> addRSGroup(String groupName) { 4046 return this.<Void> newMasterCaller() 4047 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4048 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4049 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4050 .call(); 4051 } 4052 4053 @Override 4054 public CompletableFuture<Void> removeRSGroup(String groupName) { 4055 return this.<Void> newMasterCaller() 4056 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4057 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4058 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4059 .call(); 4060 } 4061 4062 @Override 4063 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4064 BalanceRequest request) { 4065 return this.<BalanceResponse> newMasterCaller() 4066 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4067 BalanceResponse> call(controller, stub, 4068 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4069 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4070 .call(); 4071 } 4072 4073 @Override 4074 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4075 return this.<List<RSGroupInfo>> newMasterCaller() 4076 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4077 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4078 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4079 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4080 .call(); 4081 } 4082 4083 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4084 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4085 final String logType) { 4086 if (CollectionUtils.isEmpty(serverNames)) { 4087 return CompletableFuture.completedFuture(Collections.emptyList()); 4088 } 4089 return CompletableFuture.supplyAsync(() -> serverNames 4090 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4091 filterParams, limit, logType)) 4092 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4093 } 4094 4095 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4096 Map<String, Object> filterParams, int limit, String logType) { 4097 return this.<List<LogEntry>> newAdminCaller() 4098 .action((controller, stub) -> this.adminCall(controller, stub, 4099 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4100 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4101 .serverName(serverName).call(); 4102 } 4103 4104 @Override 4105 public CompletableFuture<List<Boolean>> 4106 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4107 if (CollectionUtils.isEmpty(serverNames)) { 4108 return CompletableFuture.completedFuture(Collections.emptyList()); 4109 } 4110 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4111 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4112 return convertToFutureOfList(clearSlowLogResponseList); 4113 } 4114 4115 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4116 return this.<Boolean> newAdminCaller() 4117 .action((controller, stub) -> this.adminCall(controller, stub, 4118 RequestConverter.buildClearSlowLogResponseRequest(), 4119 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4120 .serverName(serverName).call(); 4121 } 4122 4123 private static <T> CompletableFuture<List<T>> 4124 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4125 CompletableFuture<Void> allDoneFuture = 4126 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4127 return allDoneFuture 4128 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4129 } 4130 4131 @Override 4132 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4133 return this.<List<TableName>> newMasterCaller() 4134 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4135 List<TableName>> call(controller, stub, 4136 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4137 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4138 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4139 .call(); 4140 } 4141 4142 @Override 4143 public CompletableFuture<Pair<List<String>, List<TableName>>> 4144 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4145 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4146 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4147 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4148 Pair<List<String>, List<TableName>>> call(controller, stub, 4149 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4150 .build(), 4151 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4152 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4153 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4154 .call(); 4155 } 4156 4157 @Override 4158 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4159 return this.<RSGroupInfo> newMasterCaller() 4160 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4161 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4162 GetRSGroupInfoOfServerRequest.newBuilder() 4163 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4164 .setPort(hostPort.getPort()).build()) 4165 .build(), 4166 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4167 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4168 .call(); 4169 } 4170 4171 @Override 4172 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4173 return this.<Void> newMasterCaller() 4174 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4175 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4176 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4177 .call(); 4178 } 4179 4180 @Override 4181 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4182 CompletableFuture<Void> future = new CompletableFuture<>(); 4183 for (TableName tableName : tables) { 4184 addListener(tableExists(tableName), (exist, err) -> { 4185 if (err != null) { 4186 future.completeExceptionally(err); 4187 return; 4188 } 4189 if (!exist) { 4190 future.completeExceptionally(new TableNotFoundException(tableName)); 4191 return; 4192 } 4193 }); 4194 } 4195 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4196 if (err != null) { 4197 future.completeExceptionally(err); 4198 return; 4199 } 4200 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4201 future.complete(null); 4202 return; 4203 } 4204 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4205 for (TableDescriptor td : tableDescriptions) { 4206 newTableDescriptors 4207 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4208 } 4209 addListener( 4210 CompletableFuture.allOf( 4211 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4212 (v, e) -> { 4213 if (e != null) { 4214 future.completeExceptionally(e); 4215 } else { 4216 future.complete(v); 4217 } 4218 }); 4219 }); 4220 return future; 4221 } 4222 4223 @Override 4224 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4225 return this.<RSGroupInfo> newMasterCaller() 4226 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4227 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4228 GetRSGroupInfoOfTableRequest.newBuilder() 4229 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4230 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4231 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4232 .call(); 4233 } 4234 4235 @Override 4236 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4237 return this.<RSGroupInfo> newMasterCaller() 4238 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4239 RSGroupInfo> call(controller, stub, 4240 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4241 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4242 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4243 .call(); 4244 } 4245 4246 @Override 4247 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4248 return this.<Void> newMasterCaller() 4249 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4250 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4251 .setNewRsgroupName(newName).build(), 4252 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4253 .call(); 4254 } 4255 4256 @Override 4257 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4258 Map<String, String> configuration) { 4259 UpdateRSGroupConfigRequest.Builder request = 4260 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4261 if (configuration != null) { 4262 configuration.entrySet().forEach(e -> request.addConfiguration( 4263 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4264 } 4265 return this.<Void> newMasterCaller() 4266 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4267 Void> call(controller, stub, request.build(), 4268 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4269 .call(); 4270 } 4271 4272 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4273 return this.<List<LogEntry>> newMasterCaller() 4274 .action((controller, stub) -> this.call(controller, stub, 4275 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4276 ProtobufUtil::toBalancerDecisionResponse)) 4277 .call(); 4278 } 4279 4280 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4281 return this.<List<LogEntry>> newMasterCaller() 4282 .action((controller, stub) -> this.call(controller, stub, 4283 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4284 ProtobufUtil::toBalancerRejectionResponse)) 4285 .call(); 4286 } 4287 4288 @Override 4289 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4290 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4291 if (logType == null || serverType == null) { 4292 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4293 } 4294 switch (logType) { 4295 case "SLOW_LOG": 4296 case "LARGE_LOG": 4297 if (ServerType.MASTER.equals(serverType)) { 4298 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4299 } 4300 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4301 case "BALANCER_DECISION": 4302 if (ServerType.REGION_SERVER.equals(serverType)) { 4303 throw new IllegalArgumentException( 4304 "Balancer Decision logs are not maintained by HRegionServer"); 4305 } 4306 return getBalancerDecisions(limit); 4307 case "BALANCER_REJECTION": 4308 if (ServerType.REGION_SERVER.equals(serverType)) { 4309 throw new IllegalArgumentException( 4310 "Balancer Rejection logs are not maintained by HRegionServer"); 4311 } 4312 return getBalancerRejections(limit); 4313 default: 4314 return CompletableFuture.completedFuture(Collections.emptyList()); 4315 } 4316 } 4317 4318 @Override 4319 public CompletableFuture<Void> flushMasterStore() { 4320 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4321 return this.<Void> newMasterCaller() 4322 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4323 Void> call(controller, stub, request.build(), 4324 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4325 .call(); 4326 } 4327}