001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.function.BiConsumer; 042import java.util.function.Consumer; 043import java.util.function.Function; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.CacheEvictionStats; 050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 051import org.apache.hadoop.hbase.CatalogFamilyFormat; 052import org.apache.hadoop.hbase.ClientMetaTableAccessor; 053import org.apache.hadoop.hbase.ClusterMetrics; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.ClusterMetricsBuilder; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.NamespaceDescriptor; 059import org.apache.hadoop.hbase.RegionLocations; 060import org.apache.hadoop.hbase.RegionMetrics; 061import org.apache.hadoop.hbase.RegionMetricsBuilder; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableExistsException; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.TableNotDisabledException; 066import org.apache.hadoop.hbase.TableNotEnabledException; 067import org.apache.hadoop.hbase.TableNotFoundException; 068import org.apache.hadoop.hbase.UnknownRegionException; 069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.Scan.ReadType; 073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 074import org.apache.hadoop.hbase.client.replication.TableCFs; 075import org.apache.hadoop.hbase.client.security.SecurityCapability; 076import org.apache.hadoop.hbase.exceptions.DeserializationException; 077import org.apache.hadoop.hbase.ipc.HBaseRpcController; 078import org.apache.hadoop.hbase.net.Address; 079import org.apache.hadoop.hbase.quotas.QuotaFilter; 080import org.apache.hadoop.hbase.quotas.QuotaSettings; 081import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 083import org.apache.hadoop.hbase.replication.ReplicationException; 084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 086import org.apache.hadoop.hbase.replication.SyncReplicationState; 087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 089import org.apache.hadoop.hbase.security.access.Permission; 090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 091import org.apache.hadoop.hbase.security.access.UserPermission; 092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.yetus.audience.InterfaceAudience; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 104import org.apache.hbase.thirdparty.com.google.protobuf.Message; 105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 108import org.apache.hbase.thirdparty.io.netty.util.Timeout; 109import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 111 112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 350 351/** 352 * The implementation of AsyncAdmin. 353 * <p> 354 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 355 * be finished inside the rpc framework thread, which means that the callbacks registered to the 356 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 357 * this class should not try to do time consuming tasks in the callbacks. 358 * @since 2.0.0 359 * @see AsyncHBaseAdmin 360 * @see AsyncConnection#getAdmin() 361 * @see AsyncConnection#getAdminBuilder() 362 */ 363@InterfaceAudience.Private 364class RawAsyncHBaseAdmin implements AsyncAdmin { 365 366 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 367 368 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 369 370 private final AsyncConnectionImpl connection; 371 372 private final HashedWheelTimer retryTimer; 373 374 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 375 376 private final long rpcTimeoutNs; 377 378 private final long operationTimeoutNs; 379 380 private final long pauseNs; 381 382 private final long pauseNsForServerOverloaded; 383 384 private final int maxAttempts; 385 386 private final int startLogErrorsCnt; 387 388 private final NonceGenerator ng; 389 390 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 391 AsyncAdminBuilderBase builder) { 392 this.connection = connection; 393 this.retryTimer = retryTimer; 394 this.metaTable = connection.getTable(META_TABLE_NAME); 395 this.rpcTimeoutNs = builder.rpcTimeoutNs; 396 this.operationTimeoutNs = builder.operationTimeoutNs; 397 this.pauseNs = builder.pauseNs; 398 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 399 LOG.warn( 400 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 401 + " the normal pause value {} ms, use the greater one instead", 402 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 403 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 404 this.pauseNsForServerOverloaded = builder.pauseNs; 405 } else { 406 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 407 } 408 this.maxAttempts = builder.maxAttempts; 409 this.startLogErrorsCnt = builder.startLogErrorsCnt; 410 this.ng = connection.getNonceGenerator(); 411 } 412 413 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 414 return this.connection.callerFactory.<T> masterRequest() 415 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 416 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 417 .pause(pauseNs, TimeUnit.NANOSECONDS) 418 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 419 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 420 } 421 422 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 423 return this.connection.callerFactory.<T> adminRequest() 424 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 425 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 426 .pause(pauseNs, TimeUnit.NANOSECONDS) 427 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 428 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 429 } 430 431 @FunctionalInterface 432 private interface MasterRpcCall<RESP, REQ> { 433 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 434 RpcCallback<RESP> done); 435 } 436 437 @FunctionalInterface 438 private interface AdminRpcCall<RESP, REQ> { 439 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 440 RpcCallback<RESP> done); 441 } 442 443 @FunctionalInterface 444 private interface Converter<D, S> { 445 D convert(S src) throws IOException; 446 } 447 448 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 449 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 450 Converter<RESP, PRESP> respConverter) { 451 CompletableFuture<RESP> future = new CompletableFuture<>(); 452 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 453 454 @Override 455 public void run(PRESP resp) { 456 if (controller.failed()) { 457 future.completeExceptionally(controller.getFailed()); 458 } else { 459 try { 460 future.complete(respConverter.convert(resp)); 461 } catch (IOException e) { 462 future.completeExceptionally(e); 463 } 464 } 465 } 466 }); 467 return future; 468 } 469 470 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 471 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 472 Converter<RESP, PRESP> respConverter) { 473 CompletableFuture<RESP> future = new CompletableFuture<>(); 474 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 475 476 @Override 477 public void run(PRESP resp) { 478 if (controller.failed()) { 479 future.completeExceptionally(controller.getFailed()); 480 } else { 481 try { 482 future.complete(respConverter.convert(resp)); 483 } catch (IOException e) { 484 future.completeExceptionally(e); 485 } 486 } 487 } 488 }); 489 return future; 490 } 491 492 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 493 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 494 ProcedureBiConsumer consumer) { 495 return procedureCall(b -> { 496 }, preq, rpcCall, respConverter, consumer); 497 } 498 499 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 500 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 501 ProcedureBiConsumer consumer) { 502 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 503 } 504 505 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 506 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 507 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 508 ProcedureBiConsumer consumer) { 509 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 510 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 511 prioritySetter.accept(builder); 512 CompletableFuture<Long> procFuture = builder.call(); 513 CompletableFuture<Void> future = waitProcedureResult(procFuture); 514 addListener(future, consumer); 515 return future; 516 } 517 518 @Override 519 public CompletableFuture<Boolean> tableExists(TableName tableName) { 520 if (TableName.isMetaTableName(tableName)) { 521 return CompletableFuture.completedFuture(true); 522 } 523 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 524 } 525 526 @Override 527 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 528 return getTableDescriptors( 529 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 530 } 531 532 /** 533 * {@link #listTableDescriptors(boolean)} 534 */ 535 @Override 536 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 537 boolean includeSysTables) { 538 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 539 + "use listTableDescriptors(boolean) instead"); 540 return getTableDescriptors( 541 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 542 } 543 544 @Override 545 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 546 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 547 + "use listTableDescriptors(boolean) instead"); 548 if (tableNames.isEmpty()) { 549 return CompletableFuture.completedFuture(Collections.emptyList()); 550 } 551 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 552 } 553 554 private CompletableFuture<List<TableDescriptor>> 555 getTableDescriptors(GetTableDescriptorsRequest request) { 556 return this.<List<TableDescriptor>> newMasterCaller() 557 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 558 List<TableDescriptor>> call(controller, stub, request, 559 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 560 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 561 .call(); 562 } 563 564 @Override 565 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 566 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 567 } 568 569 @Override 570 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 571 boolean includeSysTables) { 572 Preconditions.checkNotNull(pattern, 573 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 574 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 575 } 576 577 @Override 578 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 579 return this.<List<TableName>> newMasterCaller() 580 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 581 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 582 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 583 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 584 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 585 .call(); 586 } 587 588 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 589 return this.<List<TableName>> newMasterCaller() 590 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 591 List<TableName>> call(controller, stub, request, 592 (s, c, req, done) -> s.getTableNames(c, req, done), 593 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 594 .call(); 595 } 596 597 @Override 598 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 599 return this.<List<TableDescriptor>> newMasterCaller() 600 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 601 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 602 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 603 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 604 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 605 .call(); 606 } 607 608 @Override 609 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 610 return this.<List<TableDescriptor>> newMasterCaller() 611 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 612 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 613 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 614 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 615 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 616 .call(); 617 } 618 619 @Override 620 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 621 return this.<List<TableName>> newMasterCaller() 622 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 623 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 624 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 625 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 626 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 627 .call(); 628 } 629 630 @Override 631 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 632 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 633 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 634 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 635 List<TableSchema>> call(controller, stub, 636 RequestConverter.buildGetTableDescriptorsRequest(tableName), 637 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 638 (resp) -> resp.getTableSchemaList())) 639 .call(), (tableSchemas, error) -> { 640 if (error != null) { 641 future.completeExceptionally(error); 642 return; 643 } 644 if (!tableSchemas.isEmpty()) { 645 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 646 } else { 647 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 648 } 649 }); 650 return future; 651 } 652 653 @Override 654 public CompletableFuture<Void> createTable(TableDescriptor desc) { 655 return createTable(desc.getTableName(), 656 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 657 } 658 659 @Override 660 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 661 int numRegions) { 662 try { 663 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 664 } catch (IllegalArgumentException e) { 665 return failedFuture(e); 666 } 667 } 668 669 @Override 670 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 671 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 672 + " use createTable(TableDescriptor) instead"); 673 try { 674 verifySplitKeys(splitKeys); 675 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 676 splitKeys, ng.getNonceGroup(), ng.newNonce())); 677 } catch (IllegalArgumentException e) { 678 return failedFuture(e); 679 } 680 } 681 682 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 683 Preconditions.checkNotNull(tableName, "table name is null"); 684 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 685 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 686 new CreateTableProcedureBiConsumer(tableName)); 687 } 688 689 @Override 690 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 691 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 692 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 693 ng.newNonce()), 694 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 695 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 696 } 697 698 @Override 699 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 700 return this.<ModifyTableStoreFileTrackerRequest, 701 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 702 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 703 ng.getNonceGroup(), ng.newNonce()), 704 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 705 (resp) -> resp.getProcId(), 706 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 707 } 708 709 @Override 710 public CompletableFuture<Void> deleteTable(TableName tableName) { 711 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 712 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 713 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 714 new DeleteTableProcedureBiConsumer(tableName)); 715 } 716 717 @Override 718 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 719 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 720 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 721 ng.newNonce()), 722 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 723 new TruncateTableProcedureBiConsumer(tableName)); 724 } 725 726 @Override 727 public CompletableFuture<Void> enableTable(TableName tableName) { 728 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 729 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 730 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 731 new EnableTableProcedureBiConsumer(tableName)); 732 } 733 734 @Override 735 public CompletableFuture<Void> disableTable(TableName tableName) { 736 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 737 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 738 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 739 new DisableTableProcedureBiConsumer(tableName)); 740 } 741 742 /** 743 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 744 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 745 * targetState). 746 */ 747 private static CompletableFuture<Boolean> completeCheckTableState( 748 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 749 TableState.State targetState, TableName tableName) { 750 if (error != null) { 751 future.completeExceptionally(error); 752 } else { 753 if (tableState != null) { 754 future.complete(tableState.inStates(targetState)); 755 } else { 756 future.completeExceptionally(new TableNotFoundException(tableName)); 757 } 758 } 759 return future; 760 } 761 762 @Override 763 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 764 if (TableName.isMetaTableName(tableName)) { 765 return CompletableFuture.completedFuture(true); 766 } 767 CompletableFuture<Boolean> future = new CompletableFuture<>(); 768 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 769 (tableState, error) -> { 770 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 771 TableState.State.ENABLED, tableName); 772 }); 773 return future; 774 } 775 776 @Override 777 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 778 if (TableName.isMetaTableName(tableName)) { 779 return CompletableFuture.completedFuture(false); 780 } 781 CompletableFuture<Boolean> future = new CompletableFuture<>(); 782 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 783 (tableState, error) -> { 784 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 785 TableState.State.DISABLED, tableName); 786 }); 787 return future; 788 } 789 790 @Override 791 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 792 if (TableName.isMetaTableName(tableName)) { 793 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 794 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 795 } 796 CompletableFuture<Boolean> future = new CompletableFuture<>(); 797 addListener(isTableEnabled(tableName), (enabled, error) -> { 798 if (error != null) { 799 if (error instanceof TableNotFoundException) { 800 future.complete(false); 801 } else { 802 future.completeExceptionally(error); 803 } 804 return; 805 } 806 if (!enabled) { 807 future.complete(false); 808 } else { 809 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 810 (locations, error1) -> { 811 if (error1 != null) { 812 future.completeExceptionally(error1); 813 return; 814 } 815 List<HRegionLocation> notDeployedRegions = locations.stream() 816 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 817 if (notDeployedRegions.size() > 0) { 818 if (LOG.isDebugEnabled()) { 819 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 820 } 821 future.complete(false); 822 return; 823 } 824 future.complete(true); 825 }); 826 } 827 }); 828 return future; 829 } 830 831 @Override 832 public CompletableFuture<Void> addColumnFamily(TableName tableName, 833 ColumnFamilyDescriptor columnFamily) { 834 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 835 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 836 ng.newNonce()), 837 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 838 new AddColumnFamilyProcedureBiConsumer(tableName)); 839 } 840 841 @Override 842 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 843 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 844 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 845 ng.newNonce()), 846 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 847 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 848 } 849 850 @Override 851 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 852 ColumnFamilyDescriptor columnFamily) { 853 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 854 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 855 ng.newNonce()), 856 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 857 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 858 } 859 860 @Override 861 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 862 byte[] family, String dstSFT) { 863 return this.<ModifyColumnStoreFileTrackerRequest, 864 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 865 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 866 ng.getNonceGroup(), ng.newNonce()), 867 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 868 (resp) -> resp.getProcId(), 869 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 870 } 871 872 @Override 873 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 874 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 875 RequestConverter.buildCreateNamespaceRequest(descriptor), 876 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 877 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 878 } 879 880 @Override 881 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 882 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 883 RequestConverter.buildModifyNamespaceRequest(descriptor), 884 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 885 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 886 } 887 888 @Override 889 public CompletableFuture<Void> deleteNamespace(String name) { 890 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 891 RequestConverter.buildDeleteNamespaceRequest(name), 892 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 893 new DeleteNamespaceProcedureBiConsumer(name)); 894 } 895 896 @Override 897 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 898 return this.<NamespaceDescriptor> newMasterCaller() 899 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 900 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 901 RequestConverter.buildGetNamespaceDescriptorRequest(name), 902 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 903 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 904 .call(); 905 } 906 907 @Override 908 public CompletableFuture<List<String>> listNamespaces() { 909 return this.<List<String>> newMasterCaller() 910 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 911 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 912 (s, c, req, done) -> s.listNamespaces(c, req, done), 913 (resp) -> resp.getNamespaceNameList())) 914 .call(); 915 } 916 917 @Override 918 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 919 return this.<List<NamespaceDescriptor>> newMasterCaller() 920 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 921 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 922 ListNamespaceDescriptorsRequest.newBuilder().build(), 923 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 924 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 925 .call(); 926 } 927 928 @Override 929 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 930 return this.<List<RegionInfo>> newAdminCaller() 931 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 932 List<RegionInfo>> adminCall(controller, stub, 933 RequestConverter.buildGetOnlineRegionRequest(), 934 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 935 resp -> ProtobufUtil.getRegionInfos(resp))) 936 .serverName(serverName).call(); 937 } 938 939 @Override 940 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 941 if (tableName.equals(META_TABLE_NAME)) { 942 return connection.registry.getMetaRegionLocations() 943 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 944 .collect(Collectors.toList())); 945 } else { 946 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 947 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 948 } 949 } 950 951 @Override 952 public CompletableFuture<Void> flush(TableName tableName) { 953 return flush(tableName, null); 954 } 955 956 @Override 957 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 958 CompletableFuture<Void> future = new CompletableFuture<>(); 959 addListener(tableExists(tableName), (exists, err) -> { 960 if (err != null) { 961 future.completeExceptionally(err); 962 } else if (!exists) { 963 future.completeExceptionally(new TableNotFoundException(tableName)); 964 } else { 965 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 966 if (err2 != null) { 967 future.completeExceptionally(err2); 968 } else if (!tableEnabled) { 969 future.completeExceptionally(new TableNotEnabledException(tableName)); 970 } else { 971 Map<String, String> props = new HashMap<>(); 972 if (columnFamily != null) { 973 props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); 974 } 975 addListener( 976 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 977 (ret, err3) -> { 978 if (err3 != null) { 979 future.completeExceptionally(err3); 980 } else { 981 future.complete(ret); 982 } 983 }); 984 } 985 }); 986 } 987 }); 988 return future; 989 } 990 991 @Override 992 public CompletableFuture<Void> flushRegion(byte[] regionName) { 993 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 994 }); 995 } 996 997 @Override 998 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 999 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1000 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1001 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1002 }); 1003 } 1004 1005 /** 1006 * This method is for internal use only, where we need the response of the flush. 1007 * <p/> 1008 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1009 * API. 1010 */ 1011 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1012 boolean writeFlushWALMarker) { 1013 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1014 addListener(getRegionLocation(regionName), (location, err) -> { 1015 if (err != null) { 1016 future.completeExceptionally(err); 1017 return; 1018 } 1019 ServerName serverName = location.getServerName(); 1020 if (serverName == null) { 1021 future 1022 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1023 return; 1024 } 1025 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1026 (ret, err2) -> { 1027 if (err2 != null) { 1028 future.completeExceptionally(err2); 1029 } else { 1030 future.complete(ret); 1031 } 1032 }); 1033 }); 1034 return future; 1035 } 1036 1037 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1038 byte[] columnFamily, boolean writeFlushWALMarker) { 1039 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1040 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1041 FlushRegionResponse> adminCall(controller, stub, 1042 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1043 writeFlushWALMarker), 1044 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1045 .call(); 1046 } 1047 1048 @Override 1049 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1050 CompletableFuture<Void> future = new CompletableFuture<>(); 1051 addListener(getRegions(sn), (hRegionInfos, err) -> { 1052 if (err != null) { 1053 future.completeExceptionally(err); 1054 return; 1055 } 1056 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1057 if (hRegionInfos != null) { 1058 hRegionInfos 1059 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1060 }))); 1061 } 1062 addListener(CompletableFuture.allOf( 1063 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1064 if (err2 != null) { 1065 future.completeExceptionally(err2); 1066 } else { 1067 future.complete(ret); 1068 } 1069 }); 1070 }); 1071 return future; 1072 } 1073 1074 @Override 1075 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1076 return compact(tableName, null, false, compactType); 1077 } 1078 1079 @Override 1080 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1081 CompactType compactType) { 1082 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1083 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1084 return compact(tableName, columnFamily, false, compactType); 1085 } 1086 1087 @Override 1088 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1089 return compactRegion(regionName, null, false); 1090 } 1091 1092 @Override 1093 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1094 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1095 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1096 return compactRegion(regionName, columnFamily, false); 1097 } 1098 1099 @Override 1100 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1101 return compact(tableName, null, true, compactType); 1102 } 1103 1104 @Override 1105 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1106 CompactType compactType) { 1107 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1108 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1109 return compact(tableName, columnFamily, true, compactType); 1110 } 1111 1112 @Override 1113 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1114 return compactRegion(regionName, null, true); 1115 } 1116 1117 @Override 1118 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1119 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1120 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1121 return compactRegion(regionName, columnFamily, true); 1122 } 1123 1124 @Override 1125 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1126 return compactRegionServer(sn, false); 1127 } 1128 1129 @Override 1130 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1131 return compactRegionServer(sn, true); 1132 } 1133 1134 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1135 CompletableFuture<Void> future = new CompletableFuture<>(); 1136 addListener(getRegions(sn), (hRegionInfos, err) -> { 1137 if (err != null) { 1138 future.completeExceptionally(err); 1139 return; 1140 } 1141 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1142 if (hRegionInfos != null) { 1143 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1144 } 1145 addListener(CompletableFuture.allOf( 1146 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1147 if (err2 != null) { 1148 future.completeExceptionally(err2); 1149 } else { 1150 future.complete(ret); 1151 } 1152 }); 1153 }); 1154 return future; 1155 } 1156 1157 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1158 boolean major) { 1159 CompletableFuture<Void> future = new CompletableFuture<>(); 1160 addListener(getRegionLocation(regionName), (location, err) -> { 1161 if (err != null) { 1162 future.completeExceptionally(err); 1163 return; 1164 } 1165 ServerName serverName = location.getServerName(); 1166 if (serverName == null) { 1167 future 1168 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1169 return; 1170 } 1171 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1172 (ret, err2) -> { 1173 if (err2 != null) { 1174 future.completeExceptionally(err2); 1175 } else { 1176 future.complete(ret); 1177 } 1178 }); 1179 }); 1180 return future; 1181 } 1182 1183 /** 1184 * List all region locations for the specific table. 1185 */ 1186 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1187 if (TableName.META_TABLE_NAME.equals(tableName)) { 1188 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1189 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1190 if (err != null) { 1191 future.completeExceptionally(err); 1192 } else if ( 1193 metaRegions == null || metaRegions.isEmpty() 1194 || metaRegions.getDefaultRegionLocation() == null 1195 ) { 1196 future.completeExceptionally(new IOException("meta region does not found")); 1197 } else { 1198 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1199 } 1200 }); 1201 return future; 1202 } else { 1203 // For non-meta table, we fetch all locations by scanning hbase:meta table 1204 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1205 } 1206 } 1207 1208 /** 1209 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1210 */ 1211 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1212 CompactType compactType) { 1213 CompletableFuture<Void> future = new CompletableFuture<>(); 1214 1215 switch (compactType) { 1216 case MOB: 1217 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1218 if (err != null) { 1219 future.completeExceptionally(err); 1220 return; 1221 } 1222 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1223 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1224 if (err2 != null) { 1225 future.completeExceptionally(err2); 1226 } else { 1227 future.complete(ret); 1228 } 1229 }); 1230 }); 1231 break; 1232 case NORMAL: 1233 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1234 if (err != null) { 1235 future.completeExceptionally(err); 1236 return; 1237 } 1238 if (locations == null || locations.isEmpty()) { 1239 future.completeExceptionally(new TableNotFoundException(tableName)); 1240 } 1241 CompletableFuture<?>[] compactFutures = 1242 locations.stream().filter(l -> l.getRegion() != null) 1243 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1244 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1245 .toArray(CompletableFuture<?>[]::new); 1246 // future complete unless all of the compact futures are completed. 1247 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1248 if (err2 != null) { 1249 future.completeExceptionally(err2); 1250 } else { 1251 future.complete(ret); 1252 } 1253 }); 1254 }); 1255 break; 1256 default: 1257 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1258 } 1259 return future; 1260 } 1261 1262 /** 1263 * Compact the region at specific region server. 1264 */ 1265 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1266 final boolean major, byte[] columnFamily) { 1267 return this.<Void> newAdminCaller().serverName(sn) 1268 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1269 Void> adminCall(controller, stub, 1270 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1271 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1272 .call(); 1273 } 1274 1275 private byte[] toEncodeRegionName(byte[] regionName) { 1276 return RegionInfo.isEncodedRegionName(regionName) 1277 ? regionName 1278 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1279 } 1280 1281 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1282 CompletableFuture<TableName> result) { 1283 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1284 if (err != null) { 1285 result.completeExceptionally(err); 1286 return; 1287 } 1288 RegionInfo regionInfo = location.getRegion(); 1289 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1290 result.completeExceptionally( 1291 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1292 return; 1293 } 1294 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1295 if (!tableName.get().equals(regionInfo.getTable())) { 1296 // tables of this two region should be same. 1297 result.completeExceptionally( 1298 new IllegalArgumentException("Cannot merge regions from two different tables " 1299 + tableName.get() + " and " + regionInfo.getTable())); 1300 } else { 1301 result.complete(tableName.get()); 1302 } 1303 } 1304 }); 1305 } 1306 1307 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1308 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1309 CompletableFuture<TableName> future = new CompletableFuture<>(); 1310 for (byte[] encodedRegionName : encodedRegionNames) { 1311 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1312 } 1313 return future; 1314 } 1315 1316 @Override 1317 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1318 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1319 } 1320 1321 @Override 1322 public CompletableFuture<Boolean> isMergeEnabled() { 1323 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1324 } 1325 1326 @Override 1327 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1328 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1329 } 1330 1331 @Override 1332 public CompletableFuture<Boolean> isSplitEnabled() { 1333 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1334 } 1335 1336 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1337 MasterSwitchType switchType) { 1338 SetSplitOrMergeEnabledRequest request = 1339 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1340 return this.<Boolean> newMasterCaller() 1341 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1342 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1343 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1344 (resp) -> resp.getPrevValueList().get(0))) 1345 .call(); 1346 } 1347 1348 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1349 IsSplitOrMergeEnabledRequest request = 1350 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1351 return this.<Boolean> newMasterCaller() 1352 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1353 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1354 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1355 .call(); 1356 } 1357 1358 @Override 1359 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1360 if (nameOfRegionsToMerge.size() < 2) { 1361 return failedFuture(new IllegalArgumentException( 1362 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1363 } 1364 CompletableFuture<Void> future = new CompletableFuture<>(); 1365 byte[][] encodedNameOfRegionsToMerge = 1366 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1367 1368 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1369 if (err != null) { 1370 future.completeExceptionally(err); 1371 return; 1372 } 1373 1374 final MergeTableRegionsRequest request; 1375 try { 1376 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1377 forcible, ng.getNonceGroup(), ng.newNonce()); 1378 } catch (DeserializationException e) { 1379 future.completeExceptionally(e); 1380 return; 1381 } 1382 1383 addListener( 1384 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1385 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1386 (ret, err2) -> { 1387 if (err2 != null) { 1388 future.completeExceptionally(err2); 1389 } else { 1390 future.complete(ret); 1391 } 1392 }); 1393 }); 1394 return future; 1395 } 1396 1397 @Override 1398 public CompletableFuture<Void> split(TableName tableName) { 1399 CompletableFuture<Void> future = new CompletableFuture<>(); 1400 addListener(tableExists(tableName), (exist, error) -> { 1401 if (error != null) { 1402 future.completeExceptionally(error); 1403 return; 1404 } 1405 if (!exist) { 1406 future.completeExceptionally(new TableNotFoundException(tableName)); 1407 return; 1408 } 1409 addListener(metaTable 1410 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1411 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1412 ClientMetaTableAccessor.QueryType.REGION)) 1413 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1414 ClientMetaTableAccessor.QueryType.REGION))), 1415 (results, err2) -> { 1416 if (err2 != null) { 1417 future.completeExceptionally(err2); 1418 return; 1419 } 1420 if (results != null && !results.isEmpty()) { 1421 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1422 for (Result r : results) { 1423 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1424 continue; 1425 } 1426 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1427 if (rl != null) { 1428 for (HRegionLocation h : rl.getRegionLocations()) { 1429 if (h != null && h.getServerName() != null) { 1430 RegionInfo hri = h.getRegion(); 1431 if ( 1432 hri == null || hri.isSplitParent() 1433 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1434 ) { 1435 continue; 1436 } 1437 splitFutures.add(split(hri, null)); 1438 } 1439 } 1440 } 1441 } 1442 addListener( 1443 CompletableFuture 1444 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1445 (ret, exception) -> { 1446 if (exception != null) { 1447 future.completeExceptionally(exception); 1448 return; 1449 } 1450 future.complete(ret); 1451 }); 1452 } else { 1453 future.complete(null); 1454 } 1455 }); 1456 }); 1457 return future; 1458 } 1459 1460 @Override 1461 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1462 CompletableFuture<Void> result = new CompletableFuture<>(); 1463 if (splitPoint == null) { 1464 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1465 } 1466 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1467 (loc, err) -> { 1468 if (err != null) { 1469 result.completeExceptionally(err); 1470 } else if (loc == null || loc.getRegion() == null) { 1471 result.completeExceptionally(new IllegalArgumentException( 1472 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1473 } else { 1474 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1475 if (err2 != null) { 1476 result.completeExceptionally(err2); 1477 } else { 1478 result.complete(ret); 1479 } 1480 1481 }); 1482 } 1483 }); 1484 return result; 1485 } 1486 1487 @Override 1488 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1489 CompletableFuture<Void> future = new CompletableFuture<>(); 1490 addListener(getRegionLocation(regionName), (location, err) -> { 1491 if (err != null) { 1492 future.completeExceptionally(err); 1493 return; 1494 } 1495 RegionInfo regionInfo = location.getRegion(); 1496 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1497 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1498 + "Replicas are auto-split when their primary is split.")); 1499 return; 1500 } 1501 ServerName serverName = location.getServerName(); 1502 if (serverName == null) { 1503 future 1504 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1505 return; 1506 } 1507 addListener(split(regionInfo, null), (ret, err2) -> { 1508 if (err2 != null) { 1509 future.completeExceptionally(err2); 1510 } else { 1511 future.complete(ret); 1512 } 1513 }); 1514 }); 1515 return future; 1516 } 1517 1518 @Override 1519 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1520 Preconditions.checkNotNull(splitPoint, 1521 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1522 CompletableFuture<Void> future = new CompletableFuture<>(); 1523 addListener(getRegionLocation(regionName), (location, err) -> { 1524 if (err != null) { 1525 future.completeExceptionally(err); 1526 return; 1527 } 1528 RegionInfo regionInfo = location.getRegion(); 1529 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1530 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1531 + "Replicas are auto-split when their primary is split.")); 1532 return; 1533 } 1534 ServerName serverName = location.getServerName(); 1535 if (serverName == null) { 1536 future 1537 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1538 return; 1539 } 1540 if ( 1541 regionInfo.getStartKey() != null 1542 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1543 ) { 1544 future.completeExceptionally( 1545 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1546 return; 1547 } 1548 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1549 if (err2 != null) { 1550 future.completeExceptionally(err2); 1551 } else { 1552 future.complete(ret); 1553 } 1554 }); 1555 }); 1556 return future; 1557 } 1558 1559 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1560 CompletableFuture<Void> future = new CompletableFuture<>(); 1561 TableName tableName = hri.getTable(); 1562 final SplitTableRegionRequest request; 1563 try { 1564 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1565 ng.newNonce()); 1566 } catch (DeserializationException e) { 1567 future.completeExceptionally(e); 1568 return future; 1569 } 1570 1571 addListener( 1572 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1573 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1574 (ret, err2) -> { 1575 if (err2 != null) { 1576 future.completeExceptionally(err2); 1577 } else { 1578 future.complete(ret); 1579 } 1580 }); 1581 return future; 1582 } 1583 1584 @Override 1585 public CompletableFuture<Void> assign(byte[] regionName) { 1586 CompletableFuture<Void> future = new CompletableFuture<>(); 1587 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1588 if (err != null) { 1589 future.completeExceptionally(err); 1590 return; 1591 } 1592 addListener( 1593 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1594 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1595 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1596 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1597 .call(), 1598 (ret, err2) -> { 1599 if (err2 != null) { 1600 future.completeExceptionally(err2); 1601 } else { 1602 future.complete(ret); 1603 } 1604 }); 1605 }); 1606 return future; 1607 } 1608 1609 @Override 1610 public CompletableFuture<Void> unassign(byte[] regionName) { 1611 CompletableFuture<Void> future = new CompletableFuture<>(); 1612 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1613 if (err != null) { 1614 future.completeExceptionally(err); 1615 return; 1616 } 1617 addListener( 1618 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1619 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1620 Void> call(controller, stub, 1621 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1622 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1623 .call(), 1624 (ret, err2) -> { 1625 if (err2 != null) { 1626 future.completeExceptionally(err2); 1627 } else { 1628 future.complete(ret); 1629 } 1630 }); 1631 }); 1632 return future; 1633 } 1634 1635 @Override 1636 public CompletableFuture<Void> offline(byte[] regionName) { 1637 CompletableFuture<Void> future = new CompletableFuture<>(); 1638 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1639 if (err != null) { 1640 future.completeExceptionally(err); 1641 return; 1642 } 1643 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1644 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1645 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1646 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1647 .call(), (ret, err2) -> { 1648 if (err2 != null) { 1649 future.completeExceptionally(err2); 1650 } else { 1651 future.complete(ret); 1652 } 1653 }); 1654 }); 1655 return future; 1656 } 1657 1658 @Override 1659 public CompletableFuture<Void> move(byte[] regionName) { 1660 CompletableFuture<Void> future = new CompletableFuture<>(); 1661 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1662 if (err != null) { 1663 future.completeExceptionally(err); 1664 return; 1665 } 1666 addListener( 1667 moveRegion(regionInfo, 1668 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1669 (ret, err2) -> { 1670 if (err2 != null) { 1671 future.completeExceptionally(err2); 1672 } else { 1673 future.complete(ret); 1674 } 1675 }); 1676 }); 1677 return future; 1678 } 1679 1680 @Override 1681 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1682 Preconditions.checkNotNull(destServerName, 1683 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1684 CompletableFuture<Void> future = new CompletableFuture<>(); 1685 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1686 if (err != null) { 1687 future.completeExceptionally(err); 1688 return; 1689 } 1690 addListener( 1691 moveRegion(regionInfo, RequestConverter 1692 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1693 (ret, err2) -> { 1694 if (err2 != null) { 1695 future.completeExceptionally(err2); 1696 } else { 1697 future.complete(ret); 1698 } 1699 }); 1700 }); 1701 return future; 1702 } 1703 1704 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1705 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1706 .action( 1707 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1708 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1709 .call(); 1710 } 1711 1712 @Override 1713 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1714 return this.<Void> newMasterCaller() 1715 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1716 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1717 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1718 .call(); 1719 } 1720 1721 @Override 1722 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1723 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1724 Scan scan = QuotaTableUtil.makeScan(filter); 1725 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1726 new AdvancedScanResultConsumer() { 1727 List<QuotaSettings> settings = new ArrayList<>(); 1728 1729 @Override 1730 public void onNext(Result[] results, ScanController controller) { 1731 for (Result result : results) { 1732 try { 1733 QuotaTableUtil.parseResultToCollection(result, settings); 1734 } catch (IOException e) { 1735 controller.terminate(); 1736 future.completeExceptionally(e); 1737 } 1738 } 1739 } 1740 1741 @Override 1742 public void onError(Throwable error) { 1743 future.completeExceptionally(error); 1744 } 1745 1746 @Override 1747 public void onComplete() { 1748 future.complete(settings); 1749 } 1750 }); 1751 return future; 1752 } 1753 1754 @Override 1755 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1756 boolean enabled) { 1757 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1758 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1759 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1760 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1761 } 1762 1763 @Override 1764 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1765 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1766 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1767 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1768 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1769 } 1770 1771 @Override 1772 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1773 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1774 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1775 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1776 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1777 } 1778 1779 @Override 1780 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1781 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1782 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1783 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1784 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1785 } 1786 1787 @Override 1788 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1789 return this.<ReplicationPeerConfig> newMasterCaller() 1790 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1791 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1792 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1793 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1794 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1795 .call(); 1796 } 1797 1798 @Override 1799 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1800 ReplicationPeerConfig peerConfig) { 1801 return this.<UpdateReplicationPeerConfigRequest, 1802 UpdateReplicationPeerConfigResponse> procedureCall( 1803 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1804 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1805 (resp) -> resp.getProcId(), 1806 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1807 } 1808 1809 @Override 1810 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1811 SyncReplicationState clusterState) { 1812 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1813 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1814 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1815 clusterState), 1816 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1817 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1818 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1819 } 1820 1821 @Override 1822 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1823 Map<TableName, List<String>> tableCfs) { 1824 if (tableCfs == null) { 1825 return failedFuture(new ReplicationException("tableCfs is null")); 1826 } 1827 1828 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1829 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1830 if (!completeExceptionally(future, error)) { 1831 ReplicationPeerConfig newPeerConfig = 1832 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1833 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1834 if (!completeExceptionally(future, error)) { 1835 future.complete(result); 1836 } 1837 }); 1838 } 1839 }); 1840 return future; 1841 } 1842 1843 @Override 1844 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1845 Map<TableName, List<String>> tableCfs) { 1846 if (tableCfs == null) { 1847 return failedFuture(new ReplicationException("tableCfs is null")); 1848 } 1849 1850 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1851 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1852 if (!completeExceptionally(future, error)) { 1853 ReplicationPeerConfig newPeerConfig = null; 1854 try { 1855 newPeerConfig = ReplicationPeerConfigUtil 1856 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1857 } catch (ReplicationException e) { 1858 future.completeExceptionally(e); 1859 return; 1860 } 1861 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1862 if (!completeExceptionally(future, error)) { 1863 future.complete(result); 1864 } 1865 }); 1866 } 1867 }); 1868 return future; 1869 } 1870 1871 @Override 1872 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1873 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1874 } 1875 1876 @Override 1877 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1878 Preconditions.checkNotNull(pattern, 1879 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1880 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1881 } 1882 1883 private CompletableFuture<List<ReplicationPeerDescription>> 1884 listReplicationPeers(ListReplicationPeersRequest request) { 1885 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1886 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1887 List<ReplicationPeerDescription>> call(controller, stub, request, 1888 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1889 (resp) -> resp.getPeerDescList().stream() 1890 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1891 .collect(Collectors.toList()))) 1892 .call(); 1893 } 1894 1895 @Override 1896 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1897 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1898 addListener(listTableDescriptors(), (tables, error) -> { 1899 if (!completeExceptionally(future, error)) { 1900 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1901 tables.forEach(table -> { 1902 Map<String, Integer> cfs = new HashMap<>(); 1903 Stream.of(table.getColumnFamilies()) 1904 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1905 .forEach(column -> { 1906 cfs.put(column.getNameAsString(), column.getScope()); 1907 }); 1908 if (!cfs.isEmpty()) { 1909 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1910 } 1911 }); 1912 future.complete(replicatedTableCFs); 1913 } 1914 }); 1915 return future; 1916 } 1917 1918 @Override 1919 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1920 SnapshotProtos.SnapshotDescription snapshot = 1921 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1922 try { 1923 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1924 } catch (IllegalArgumentException e) { 1925 return failedFuture(e); 1926 } 1927 CompletableFuture<Void> future = new CompletableFuture<>(); 1928 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 1929 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 1930 addListener(this.<SnapshotResponse> newMasterCaller() 1931 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 1932 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 1933 .call(), (resp, err) -> { 1934 if (err != null) { 1935 future.completeExceptionally(err); 1936 return; 1937 } 1938 waitSnapshotFinish(snapshotDesc, future, resp); 1939 }); 1940 return future; 1941 } 1942 1943 // This is for keeping compatibility with old implementation. 1944 // If there is a procId field in the response, then the snapshot will be operated with a 1945 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 1946 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 1947 SnapshotResponse resp) { 1948 if (resp.hasProcId()) { 1949 getProcedureResult(resp.getProcId(), future, 0); 1950 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 1951 } else { 1952 long expectedTimeout = resp.getExpectedTimeout(); 1953 TimerTask pollingTask = new TimerTask() { 1954 int tries = 0; 1955 long startTime = EnvironmentEdgeManager.currentTime(); 1956 long endTime = startTime + expectedTimeout; 1957 long maxPauseTime = expectedTimeout / maxAttempts; 1958 1959 @Override 1960 public void run(Timeout timeout) throws Exception { 1961 if (EnvironmentEdgeManager.currentTime() < endTime) { 1962 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 1963 if (err2 != null) { 1964 future.completeExceptionally(err2); 1965 } else if (done) { 1966 future.complete(null); 1967 } else { 1968 // retry again after pauseTime. 1969 long pauseTime = 1970 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1971 pauseTime = Math.min(pauseTime, maxPauseTime); 1972 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 1973 } 1974 }); 1975 } else { 1976 future 1977 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 1978 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 1979 } 1980 } 1981 }; 1982 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1983 } 1984 } 1985 1986 @Override 1987 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1988 return this.<Boolean> newMasterCaller() 1989 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 1990 Boolean> call(controller, stub, 1991 IsSnapshotDoneRequest.newBuilder() 1992 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 1993 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 1994 .call(); 1995 } 1996 1997 @Override 1998 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1999 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2000 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2001 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2002 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2003 } 2004 2005 @Override 2006 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2007 boolean restoreAcl) { 2008 CompletableFuture<Void> future = new CompletableFuture<>(); 2009 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2010 if (err != null) { 2011 future.completeExceptionally(err); 2012 return; 2013 } 2014 TableName tableName = null; 2015 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2016 for (SnapshotDescription snap : snapshotDescriptions) { 2017 if (snap.getName().equals(snapshotName)) { 2018 tableName = snap.getTableName(); 2019 break; 2020 } 2021 } 2022 } 2023 if (tableName == null) { 2024 future.completeExceptionally(new RestoreSnapshotException( 2025 "Unable to find the table name for snapshot=" + snapshotName)); 2026 return; 2027 } 2028 final TableName finalTableName = tableName; 2029 addListener(tableExists(finalTableName), (exists, err2) -> { 2030 if (err2 != null) { 2031 future.completeExceptionally(err2); 2032 } else if (!exists) { 2033 // if table does not exist, then just clone snapshot into new table. 2034 completeConditionalOnFuture(future, 2035 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2036 } else { 2037 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2038 if (err4 != null) { 2039 future.completeExceptionally(err4); 2040 } else if (!disabled) { 2041 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2042 } else { 2043 completeConditionalOnFuture(future, 2044 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2045 } 2046 }); 2047 } 2048 }); 2049 }); 2050 return future; 2051 } 2052 2053 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2054 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2055 if (takeFailSafeSnapshot) { 2056 CompletableFuture<Void> future = new CompletableFuture<>(); 2057 // Step.1 Take a snapshot of the current state 2058 String failSafeSnapshotSnapshotNameFormat = 2059 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2060 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2061 final String failSafeSnapshotSnapshotName = 2062 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2063 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2064 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2065 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2066 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2067 if (err != null) { 2068 future.completeExceptionally(err); 2069 } else { 2070 // Step.2 Restore snapshot 2071 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2072 (void2, err2) -> { 2073 if (err2 != null) { 2074 // Step.3.a Something went wrong during the restore and try to rollback. 2075 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2076 restoreAcl, null), (void3, err3) -> { 2077 if (err3 != null) { 2078 future.completeExceptionally(err3); 2079 } else { 2080 String msg = 2081 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" 2082 + failSafeSnapshotSnapshotName + " succeeded."; 2083 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2084 } 2085 }); 2086 } else { 2087 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2088 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2089 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2090 if (err3 != null) { 2091 LOG.error( 2092 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2093 err3); 2094 } 2095 future.complete(ret3); 2096 }); 2097 } 2098 }); 2099 } 2100 }); 2101 return future; 2102 } else { 2103 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2104 } 2105 } 2106 2107 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2108 CompletableFuture<T> parentFuture) { 2109 addListener(parentFuture, (res, err) -> { 2110 if (err != null) { 2111 dependentFuture.completeExceptionally(err); 2112 } else { 2113 dependentFuture.complete(res); 2114 } 2115 }); 2116 } 2117 2118 @Override 2119 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2120 boolean restoreAcl, String customSFT) { 2121 CompletableFuture<Void> future = new CompletableFuture<>(); 2122 addListener(tableExists(tableName), (exists, err) -> { 2123 if (err != null) { 2124 future.completeExceptionally(err); 2125 } else if (exists) { 2126 future.completeExceptionally(new TableExistsException(tableName)); 2127 } else { 2128 completeConditionalOnFuture(future, 2129 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2130 } 2131 }); 2132 return future; 2133 } 2134 2135 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2136 boolean restoreAcl, String customSFT) { 2137 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2138 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2139 try { 2140 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2141 } catch (IllegalArgumentException e) { 2142 return failedFuture(e); 2143 } 2144 RestoreSnapshotRequest.Builder builder = 2145 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2146 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2147 if (customSFT != null) { 2148 builder.setCustomSFT(customSFT); 2149 } 2150 return waitProcedureResult(this.<Long> newMasterCaller() 2151 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2152 Long> call(controller, stub, builder.build(), 2153 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2154 .call()); 2155 } 2156 2157 @Override 2158 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2159 return getCompletedSnapshots(null); 2160 } 2161 2162 @Override 2163 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2164 Preconditions.checkNotNull(pattern, 2165 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2166 return getCompletedSnapshots(pattern); 2167 } 2168 2169 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2170 return this.<List<SnapshotDescription>> newMasterCaller() 2171 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2172 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2173 GetCompletedSnapshotsRequest.newBuilder().build(), 2174 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2175 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2176 .call(); 2177 } 2178 2179 @Override 2180 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2181 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2182 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2183 return getCompletedSnapshots(tableNamePattern, null); 2184 } 2185 2186 @Override 2187 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2188 Pattern snapshotNamePattern) { 2189 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2190 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2191 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2192 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2193 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2194 } 2195 2196 private CompletableFuture<List<SnapshotDescription>> 2197 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2198 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2199 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2200 if (err != null) { 2201 future.completeExceptionally(err); 2202 return; 2203 } 2204 if (tableNames == null || tableNames.size() <= 0) { 2205 future.complete(Collections.emptyList()); 2206 return; 2207 } 2208 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2209 if (err2 != null) { 2210 future.completeExceptionally(err2); 2211 return; 2212 } 2213 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2214 future.complete(Collections.emptyList()); 2215 return; 2216 } 2217 future.complete(snapshotDescList.stream() 2218 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2219 .collect(Collectors.toList())); 2220 }); 2221 }); 2222 return future; 2223 } 2224 2225 @Override 2226 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2227 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2228 } 2229 2230 @Override 2231 public CompletableFuture<Void> deleteSnapshots() { 2232 return internalDeleteSnapshots(null, null); 2233 } 2234 2235 @Override 2236 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2237 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2238 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2239 return internalDeleteSnapshots(null, snapshotNamePattern); 2240 } 2241 2242 @Override 2243 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2244 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2245 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2246 return internalDeleteSnapshots(tableNamePattern, null); 2247 } 2248 2249 @Override 2250 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2251 Pattern snapshotNamePattern) { 2252 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2253 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2254 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2255 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2256 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2257 } 2258 2259 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2260 Pattern snapshotNamePattern) { 2261 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2262 if (tableNamePattern == null) { 2263 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2264 } else { 2265 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2266 } 2267 CompletableFuture<Void> future = new CompletableFuture<>(); 2268 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2269 if (err != null) { 2270 future.completeExceptionally(err); 2271 return; 2272 } 2273 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2274 future.complete(null); 2275 return; 2276 } 2277 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2278 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2279 if (e != null) { 2280 future.completeExceptionally(e); 2281 } else { 2282 future.complete(v); 2283 } 2284 }); 2285 }); 2286 return future; 2287 } 2288 2289 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2290 return this.<Void> newMasterCaller() 2291 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2292 controller, stub, 2293 DeleteSnapshotRequest.newBuilder() 2294 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2295 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2296 .call(); 2297 } 2298 2299 @Override 2300 public CompletableFuture<Void> execProcedure(String signature, String instance, 2301 Map<String, String> props) { 2302 CompletableFuture<Void> future = new CompletableFuture<>(); 2303 ProcedureDescription procDesc = 2304 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2305 addListener(this.<Long> newMasterCaller() 2306 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2307 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2308 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2309 .call(), (expectedTimeout, err) -> { 2310 if (err != null) { 2311 future.completeExceptionally(err); 2312 return; 2313 } 2314 TimerTask pollingTask = new TimerTask() { 2315 int tries = 0; 2316 long startTime = EnvironmentEdgeManager.currentTime(); 2317 long endTime = startTime + expectedTimeout; 2318 long maxPauseTime = expectedTimeout / maxAttempts; 2319 2320 @Override 2321 public void run(Timeout timeout) throws Exception { 2322 if (EnvironmentEdgeManager.currentTime() < endTime) { 2323 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2324 if (err2 != null) { 2325 future.completeExceptionally(err2); 2326 return; 2327 } 2328 if (done) { 2329 future.complete(null); 2330 } else { 2331 // retry again after pauseTime. 2332 long pauseTime = 2333 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2334 pauseTime = Math.min(pauseTime, maxPauseTime); 2335 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2336 TimeUnit.MICROSECONDS); 2337 } 2338 }); 2339 } else { 2340 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2341 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2342 } 2343 } 2344 }; 2345 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2346 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2347 }); 2348 return future; 2349 } 2350 2351 @Override 2352 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2353 Map<String, String> props) { 2354 ProcedureDescription proDesc = 2355 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2356 return this.<byte[]> newMasterCaller() 2357 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2358 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2359 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2360 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2361 .call(); 2362 } 2363 2364 @Override 2365 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2366 Map<String, String> props) { 2367 ProcedureDescription proDesc = 2368 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2369 return this.<Boolean> newMasterCaller() 2370 .action( 2371 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2372 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2373 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2374 .call(); 2375 } 2376 2377 @Override 2378 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2379 return this.<Boolean> newMasterCaller().action( 2380 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2381 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2382 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2383 .call(); 2384 } 2385 2386 @Override 2387 public CompletableFuture<String> getProcedures() { 2388 return this.<String> newMasterCaller() 2389 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2390 controller, stub, GetProceduresRequest.newBuilder().build(), 2391 (s, c, req, done) -> s.getProcedures(c, req, done), 2392 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2393 .call(); 2394 } 2395 2396 @Override 2397 public CompletableFuture<String> getLocks() { 2398 return this.<String> newMasterCaller() 2399 .action( 2400 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2401 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2402 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2403 .call(); 2404 } 2405 2406 @Override 2407 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2408 boolean offload) { 2409 return this.<Void> newMasterCaller() 2410 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2411 DecommissionRegionServersResponse, Void> call(controller, stub, 2412 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2413 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2414 .call(); 2415 } 2416 2417 @Override 2418 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2419 return this.<List<ServerName>> newMasterCaller() 2420 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2421 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2422 ListDecommissionedRegionServersRequest.newBuilder().build(), 2423 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2424 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2425 .collect(Collectors.toList()))) 2426 .call(); 2427 } 2428 2429 @Override 2430 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2431 List<byte[]> encodedRegionNames) { 2432 return this.<Void> newMasterCaller() 2433 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2434 RecommissionRegionServerResponse, Void> call(controller, stub, 2435 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2436 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2437 .call(); 2438 } 2439 2440 /** 2441 * Get the region location for the passed region name. The region name may be a full region name 2442 * or encoded region name. If the region does not found, then it'll throw an 2443 * UnknownRegionException wrapped by a {@link CompletableFuture} 2444 * @param regionNameOrEncodedRegionName region name or encoded region name 2445 * @return region location, wrapped by a {@link CompletableFuture} 2446 */ 2447 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2448 if (regionNameOrEncodedRegionName == null) { 2449 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2450 } 2451 2452 CompletableFuture<Optional<HRegionLocation>> future; 2453 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2454 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2455 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2456 // old format encodedName, should be meta region 2457 future = connection.registry.getMetaRegionLocations() 2458 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2459 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2460 } else { 2461 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2462 regionNameOrEncodedRegionName); 2463 } 2464 } else { 2465 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2466 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2467 RegionInfo regionInfo; 2468 try { 2469 regionInfo = 2470 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2471 } catch (IOException ioe) { 2472 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2473 } 2474 2475 if (regionInfo.isMetaRegion()) { 2476 future = connection.registry.getMetaRegionLocations() 2477 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2478 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2479 .findFirst()); 2480 } else { 2481 future = 2482 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2483 } 2484 } 2485 2486 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2487 addListener(future, (location, err) -> { 2488 if (err != null) { 2489 returnedFuture.completeExceptionally(err); 2490 return; 2491 } 2492 if (!location.isPresent() || location.get().getRegion() == null) { 2493 returnedFuture.completeExceptionally( 2494 new UnknownRegionException("Invalid region name or encoded region name: " 2495 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2496 } else { 2497 returnedFuture.complete(location.get()); 2498 } 2499 }); 2500 return returnedFuture; 2501 } 2502 2503 /** 2504 * Get the region info for the passed region name. The region name may be a full region name or 2505 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2506 * wrapped by a {@link CompletableFuture} 2507 * @return region info, wrapped by a {@link CompletableFuture} 2508 */ 2509 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2510 if (regionNameOrEncodedRegionName == null) { 2511 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2512 } 2513 2514 if ( 2515 Bytes.equals(regionNameOrEncodedRegionName, 2516 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2517 || Bytes.equals(regionNameOrEncodedRegionName, 2518 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2519 ) { 2520 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2521 } 2522 2523 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2524 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2525 if (err != null) { 2526 future.completeExceptionally(err); 2527 } else { 2528 future.complete(location.getRegion()); 2529 } 2530 }); 2531 return future; 2532 } 2533 2534 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2535 if (numRegions < 3) { 2536 throw new IllegalArgumentException("Must create at least three regions"); 2537 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2538 throw new IllegalArgumentException("Start key must be smaller than end key"); 2539 } 2540 if (numRegions == 3) { 2541 return new byte[][] { startKey, endKey }; 2542 } 2543 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2544 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2545 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2546 } 2547 return splitKeys; 2548 } 2549 2550 private void verifySplitKeys(byte[][] splitKeys) { 2551 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2552 // Verify there are no duplicate split keys 2553 byte[] lastKey = null; 2554 for (byte[] splitKey : splitKeys) { 2555 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2556 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2557 } 2558 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2559 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2560 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2561 } 2562 lastKey = splitKey; 2563 } 2564 } 2565 2566 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2567 2568 abstract void onFinished(); 2569 2570 abstract void onError(Throwable error); 2571 2572 @Override 2573 public void accept(Void v, Throwable error) { 2574 if (error != null) { 2575 onError(error); 2576 return; 2577 } 2578 onFinished(); 2579 } 2580 } 2581 2582 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2583 protected final TableName tableName; 2584 2585 TableProcedureBiConsumer(TableName tableName) { 2586 this.tableName = tableName; 2587 } 2588 2589 abstract String getOperationType(); 2590 2591 String getDescription() { 2592 return "Operation: " + getOperationType() + ", " + "Table Name: " 2593 + tableName.getNameWithNamespaceInclAsString(); 2594 } 2595 2596 @Override 2597 void onFinished() { 2598 LOG.info(getDescription() + " completed"); 2599 } 2600 2601 @Override 2602 void onError(Throwable error) { 2603 LOG.info(getDescription() + " failed with " + error.getMessage()); 2604 } 2605 } 2606 2607 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2608 protected final String namespaceName; 2609 2610 NamespaceProcedureBiConsumer(String namespaceName) { 2611 this.namespaceName = namespaceName; 2612 } 2613 2614 abstract String getOperationType(); 2615 2616 String getDescription() { 2617 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2618 } 2619 2620 @Override 2621 void onFinished() { 2622 LOG.info(getDescription() + " completed"); 2623 } 2624 2625 @Override 2626 void onError(Throwable error) { 2627 LOG.info(getDescription() + " failed with " + error.getMessage()); 2628 } 2629 } 2630 2631 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2632 2633 CreateTableProcedureBiConsumer(TableName tableName) { 2634 super(tableName); 2635 } 2636 2637 @Override 2638 String getOperationType() { 2639 return "CREATE"; 2640 } 2641 } 2642 2643 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2644 2645 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2646 super(tableName); 2647 } 2648 2649 @Override 2650 String getOperationType() { 2651 return "MODIFY"; 2652 } 2653 } 2654 2655 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2656 extends TableProcedureBiConsumer { 2657 2658 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2659 super(tableName); 2660 } 2661 2662 @Override 2663 String getOperationType() { 2664 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2665 } 2666 } 2667 2668 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2669 2670 DeleteTableProcedureBiConsumer(TableName tableName) { 2671 super(tableName); 2672 } 2673 2674 @Override 2675 String getOperationType() { 2676 return "DELETE"; 2677 } 2678 2679 @Override 2680 void onFinished() { 2681 connection.getLocator().clearCache(this.tableName); 2682 super.onFinished(); 2683 } 2684 } 2685 2686 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2687 2688 TruncateTableProcedureBiConsumer(TableName tableName) { 2689 super(tableName); 2690 } 2691 2692 @Override 2693 String getOperationType() { 2694 return "TRUNCATE"; 2695 } 2696 } 2697 2698 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2699 2700 EnableTableProcedureBiConsumer(TableName tableName) { 2701 super(tableName); 2702 } 2703 2704 @Override 2705 String getOperationType() { 2706 return "ENABLE"; 2707 } 2708 } 2709 2710 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2711 2712 DisableTableProcedureBiConsumer(TableName tableName) { 2713 super(tableName); 2714 } 2715 2716 @Override 2717 String getOperationType() { 2718 return "DISABLE"; 2719 } 2720 } 2721 2722 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2723 2724 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2725 super(tableName); 2726 } 2727 2728 @Override 2729 String getOperationType() { 2730 return "ADD_COLUMN_FAMILY"; 2731 } 2732 } 2733 2734 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2735 2736 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2737 super(tableName); 2738 } 2739 2740 @Override 2741 String getOperationType() { 2742 return "DELETE_COLUMN_FAMILY"; 2743 } 2744 } 2745 2746 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2747 2748 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2749 super(tableName); 2750 } 2751 2752 @Override 2753 String getOperationType() { 2754 return "MODIFY_COLUMN_FAMILY"; 2755 } 2756 } 2757 2758 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2759 extends TableProcedureBiConsumer { 2760 2761 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2762 super(tableName); 2763 } 2764 2765 @Override 2766 String getOperationType() { 2767 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2768 } 2769 } 2770 2771 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2772 2773 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2774 super(namespaceName); 2775 } 2776 2777 @Override 2778 String getOperationType() { 2779 return "CREATE_NAMESPACE"; 2780 } 2781 } 2782 2783 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2784 2785 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2786 super(namespaceName); 2787 } 2788 2789 @Override 2790 String getOperationType() { 2791 return "DELETE_NAMESPACE"; 2792 } 2793 } 2794 2795 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2796 2797 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2798 super(namespaceName); 2799 } 2800 2801 @Override 2802 String getOperationType() { 2803 return "MODIFY_NAMESPACE"; 2804 } 2805 } 2806 2807 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2808 2809 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2810 super(tableName); 2811 } 2812 2813 @Override 2814 String getOperationType() { 2815 return "MERGE_REGIONS"; 2816 } 2817 } 2818 2819 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2820 2821 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2822 super(tableName); 2823 } 2824 2825 @Override 2826 String getOperationType() { 2827 return "SPLIT_REGION"; 2828 } 2829 } 2830 2831 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2832 SnapshotProcedureBiConsumer(TableName tableName) { 2833 super(tableName); 2834 } 2835 2836 @Override 2837 String getOperationType() { 2838 return "SNAPSHOT"; 2839 } 2840 } 2841 2842 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2843 private final String peerId; 2844 private final Supplier<String> getOperation; 2845 2846 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2847 this.peerId = peerId; 2848 this.getOperation = getOperation; 2849 } 2850 2851 String getDescription() { 2852 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2853 } 2854 2855 @Override 2856 void onFinished() { 2857 LOG.info(getDescription() + " completed"); 2858 } 2859 2860 @Override 2861 void onError(Throwable error) { 2862 LOG.info(getDescription() + " failed with " + error.getMessage()); 2863 } 2864 } 2865 2866 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2867 CompletableFuture<Void> future = new CompletableFuture<>(); 2868 addListener(procFuture, (procId, error) -> { 2869 if (error != null) { 2870 future.completeExceptionally(error); 2871 return; 2872 } 2873 getProcedureResult(procId, future, 0); 2874 }); 2875 return future; 2876 } 2877 2878 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2879 addListener( 2880 this.<GetProcedureResultResponse> newMasterCaller() 2881 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 2882 GetProcedureResultResponse> call(controller, stub, 2883 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2884 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2885 .call(), 2886 (response, error) -> { 2887 if (error != null) { 2888 LOG.warn("failed to get the procedure result procId={}", procId, 2889 ConnectionUtils.translateException(error)); 2890 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2891 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2892 return; 2893 } 2894 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2895 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2896 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2897 return; 2898 } 2899 if (response.hasException()) { 2900 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2901 future.completeExceptionally(ioe); 2902 } else { 2903 future.complete(null); 2904 } 2905 }); 2906 } 2907 2908 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2909 CompletableFuture<T> future = new CompletableFuture<>(); 2910 future.completeExceptionally(error); 2911 return future; 2912 } 2913 2914 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2915 if (error != null) { 2916 future.completeExceptionally(error); 2917 return true; 2918 } 2919 return false; 2920 } 2921 2922 @Override 2923 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2924 return getClusterMetrics(EnumSet.allOf(Option.class)); 2925 } 2926 2927 @Override 2928 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2929 return this.<ClusterMetrics> newMasterCaller() 2930 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 2931 ClusterMetrics> call(controller, stub, 2932 RequestConverter.buildGetClusterStatusRequest(options), 2933 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2934 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 2935 .call(); 2936 } 2937 2938 @Override 2939 public CompletableFuture<Void> shutdown() { 2940 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2941 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2942 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 2943 resp -> null)) 2944 .call(); 2945 } 2946 2947 @Override 2948 public CompletableFuture<Void> stopMaster() { 2949 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2950 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 2951 controller, stub, StopMasterRequest.newBuilder().build(), 2952 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 2953 .call(); 2954 } 2955 2956 @Override 2957 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2958 StopServerRequest request = RequestConverter 2959 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 2960 return this.<Void> newAdminCaller().priority(HIGH_QOS) 2961 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2962 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2963 resp -> null)) 2964 .serverName(serverName).call(); 2965 } 2966 2967 @Override 2968 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2969 return this.<Void> newAdminCaller() 2970 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 2971 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2972 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2973 .serverName(serverName).call(); 2974 } 2975 2976 @Override 2977 public CompletableFuture<Void> updateConfiguration() { 2978 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2979 addListener( 2980 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 2981 (status, err) -> { 2982 if (err != null) { 2983 future.completeExceptionally(err); 2984 } else { 2985 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2986 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 2987 futures.add(updateConfiguration(status.getMasterName())); 2988 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2989 addListener( 2990 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2991 (result, err2) -> { 2992 if (err2 != null) { 2993 future.completeExceptionally(err2); 2994 } else { 2995 future.complete(result); 2996 } 2997 }); 2998 } 2999 }); 3000 return future; 3001 } 3002 3003 @Override 3004 public CompletableFuture<Void> updateConfiguration(String groupName) { 3005 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3006 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3007 if (err != null) { 3008 future.completeExceptionally(err); 3009 } else if (rsGroupInfo == null) { 3010 future.completeExceptionally( 3011 new IllegalArgumentException("Group does not exist: " + groupName)); 3012 } else { 3013 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3014 if (err2 != null) { 3015 future.completeExceptionally(err2); 3016 } else { 3017 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3018 List<ServerName> groupServers = status.getServersName().stream() 3019 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3020 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3021 addListener( 3022 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3023 (result, err3) -> { 3024 if (err3 != null) { 3025 future.completeExceptionally(err3); 3026 } else { 3027 future.complete(result); 3028 } 3029 }); 3030 } 3031 }); 3032 } 3033 }); 3034 return future; 3035 } 3036 3037 @Override 3038 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3039 return this.<Void> newAdminCaller() 3040 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3041 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3042 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3043 .serverName(serverName).call(); 3044 } 3045 3046 @Override 3047 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3048 return this.<Void> newAdminCaller() 3049 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3050 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3051 RequestConverter.buildClearCompactionQueuesRequest(queues), 3052 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3053 .serverName(serverName).call(); 3054 } 3055 3056 @Override 3057 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3058 return this.<List<SecurityCapability>> newMasterCaller() 3059 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3060 List<SecurityCapability>> call(controller, stub, 3061 SecurityCapabilitiesRequest.newBuilder().build(), 3062 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3063 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3064 .call(); 3065 } 3066 3067 @Override 3068 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3069 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3070 } 3071 3072 @Override 3073 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3074 TableName tableName) { 3075 Preconditions.checkNotNull(tableName, 3076 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3077 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3078 } 3079 3080 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3081 ServerName serverName) { 3082 return this.<List<RegionMetrics>> newAdminCaller() 3083 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3084 List<RegionMetrics>> adminCall(controller, stub, request, 3085 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3086 RegionMetricsBuilder::toRegionMetrics)) 3087 .serverName(serverName).call(); 3088 } 3089 3090 @Override 3091 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3092 return this.<Boolean> newMasterCaller() 3093 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3094 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3095 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3096 resp -> resp.getInMaintenanceMode())) 3097 .call(); 3098 } 3099 3100 @Override 3101 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3102 CompactType compactType) { 3103 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3104 3105 switch (compactType) { 3106 case MOB: 3107 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3108 if (err != null) { 3109 future.completeExceptionally(err); 3110 return; 3111 } 3112 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3113 3114 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3115 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3116 GetRegionInfoResponse> adminCall(controller, stub, 3117 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3118 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3119 .call(), (resp2, err2) -> { 3120 if (err2 != null) { 3121 future.completeExceptionally(err2); 3122 } else { 3123 if (resp2.hasCompactionState()) { 3124 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3125 } else { 3126 future.complete(CompactionState.NONE); 3127 } 3128 } 3129 }); 3130 }); 3131 break; 3132 case NORMAL: 3133 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3134 if (err != null) { 3135 future.completeExceptionally(err); 3136 return; 3137 } 3138 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3139 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3140 locations.stream().filter(loc -> loc.getServerName() != null) 3141 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3142 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3143 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3144 // If any region compaction state is MAJOR_AND_MINOR 3145 // the table compaction state is MAJOR_AND_MINOR, too. 3146 if (err2 != null) { 3147 future.completeExceptionally(unwrapCompletionException(err2)); 3148 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3149 future.complete(regionState); 3150 } else { 3151 regionStates.add(regionState); 3152 } 3153 })); 3154 }); 3155 addListener( 3156 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3157 (ret, err3) -> { 3158 // If future not completed, check all regions's compaction state 3159 if (!future.isCompletedExceptionally() && !future.isDone()) { 3160 CompactionState state = CompactionState.NONE; 3161 for (CompactionState regionState : regionStates) { 3162 switch (regionState) { 3163 case MAJOR: 3164 if (state == CompactionState.MINOR) { 3165 future.complete(CompactionState.MAJOR_AND_MINOR); 3166 } else { 3167 state = CompactionState.MAJOR; 3168 } 3169 break; 3170 case MINOR: 3171 if (state == CompactionState.MAJOR) { 3172 future.complete(CompactionState.MAJOR_AND_MINOR); 3173 } else { 3174 state = CompactionState.MINOR; 3175 } 3176 break; 3177 case NONE: 3178 default: 3179 } 3180 } 3181 if (!future.isDone()) { 3182 future.complete(state); 3183 } 3184 } 3185 }); 3186 }); 3187 break; 3188 default: 3189 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3190 } 3191 3192 return future; 3193 } 3194 3195 @Override 3196 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3197 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3198 addListener(getRegionLocation(regionName), (location, err) -> { 3199 if (err != null) { 3200 future.completeExceptionally(err); 3201 return; 3202 } 3203 ServerName serverName = location.getServerName(); 3204 if (serverName == null) { 3205 future 3206 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3207 return; 3208 } 3209 addListener( 3210 this.<GetRegionInfoResponse> newAdminCaller() 3211 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3212 GetRegionInfoResponse> adminCall(controller, stub, 3213 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3214 true), 3215 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3216 .serverName(serverName).call(), 3217 (resp2, err2) -> { 3218 if (err2 != null) { 3219 future.completeExceptionally(err2); 3220 } else { 3221 if (resp2.hasCompactionState()) { 3222 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3223 } else { 3224 future.complete(CompactionState.NONE); 3225 } 3226 } 3227 }); 3228 }); 3229 return future; 3230 } 3231 3232 @Override 3233 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3234 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3235 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3236 return this.<Optional<Long>> newMasterCaller() 3237 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3238 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3239 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3240 ProtobufUtil::toOptionalTimestamp)) 3241 .call(); 3242 } 3243 3244 @Override 3245 public CompletableFuture<Optional<Long>> 3246 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3247 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3248 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3249 addListener(getRegionInfo(regionName), (region, err) -> { 3250 if (err != null) { 3251 future.completeExceptionally(err); 3252 return; 3253 } 3254 MajorCompactionTimestampForRegionRequest.Builder builder = 3255 MajorCompactionTimestampForRegionRequest.newBuilder(); 3256 builder.setRegion( 3257 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3258 addListener(this.<Optional<Long>> newMasterCaller() 3259 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3260 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3261 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3262 ProtobufUtil::toOptionalTimestamp)) 3263 .call(), (timestamp, err2) -> { 3264 if (err2 != null) { 3265 future.completeExceptionally(err2); 3266 } else { 3267 future.complete(timestamp); 3268 } 3269 }); 3270 }); 3271 return future; 3272 } 3273 3274 @Override 3275 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3276 List<String> serverNamesList) { 3277 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3278 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3279 if (err != null) { 3280 future.completeExceptionally(err); 3281 return; 3282 } 3283 // Accessed by multiple threads. 3284 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3285 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3286 serverNames.stream().forEach(serverName -> { 3287 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3288 if (err2 != null) { 3289 future.completeExceptionally(unwrapCompletionException(err2)); 3290 } else { 3291 serverStates.put(serverName, serverState); 3292 } 3293 })); 3294 }); 3295 addListener( 3296 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3297 (ret, err3) -> { 3298 if (!future.isCompletedExceptionally()) { 3299 if (err3 != null) { 3300 future.completeExceptionally(err3); 3301 } else { 3302 future.complete(serverStates); 3303 } 3304 } 3305 }); 3306 }); 3307 return future; 3308 } 3309 3310 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3311 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3312 if (serverNamesList.isEmpty()) { 3313 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3314 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3315 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3316 if (err != null) { 3317 future.completeExceptionally(err); 3318 } else { 3319 future.complete(clusterMetrics.getServersName()); 3320 } 3321 }); 3322 return future; 3323 } else { 3324 List<ServerName> serverList = new ArrayList<>(); 3325 for (String regionServerName : serverNamesList) { 3326 ServerName serverName = null; 3327 try { 3328 serverName = ServerName.valueOf(regionServerName); 3329 } catch (Exception e) { 3330 future.completeExceptionally( 3331 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3332 } 3333 if (serverName == null) { 3334 future.completeExceptionally( 3335 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3336 } else { 3337 serverList.add(serverName); 3338 } 3339 } 3340 future.complete(serverList); 3341 } 3342 return future; 3343 } 3344 3345 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3346 return this.<Boolean> newAdminCaller().serverName(serverName) 3347 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3348 Boolean> adminCall(controller, stub, 3349 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3350 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3351 .call(); 3352 } 3353 3354 @Override 3355 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3356 return this.<Boolean> newMasterCaller() 3357 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3358 Boolean> call(controller, stub, 3359 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3360 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3361 (resp) -> resp.getPrevBalanceValue())) 3362 .call(); 3363 } 3364 3365 @Override 3366 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3367 return this.<BalanceResponse> newMasterCaller() 3368 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3369 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3370 (s, c, req, done) -> s.balance(c, req, done), 3371 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3372 .call(); 3373 } 3374 3375 @Override 3376 public CompletableFuture<Boolean> isBalancerEnabled() { 3377 return this.<Boolean> newMasterCaller() 3378 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3379 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3380 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3381 .call(); 3382 } 3383 3384 @Override 3385 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3386 return this.<Boolean> newMasterCaller() 3387 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3388 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3389 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3390 (resp) -> resp.getPrevNormalizerValue())) 3391 .call(); 3392 } 3393 3394 @Override 3395 public CompletableFuture<Boolean> isNormalizerEnabled() { 3396 return this.<Boolean> newMasterCaller() 3397 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3398 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3399 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3400 .call(); 3401 } 3402 3403 @Override 3404 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3405 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3406 } 3407 3408 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3409 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3410 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3411 } 3412 3413 @Override 3414 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3415 return this.<Boolean> newMasterCaller() 3416 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3417 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3418 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3419 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3420 (resp) -> resp.getPrevValue())) 3421 .call(); 3422 } 3423 3424 @Override 3425 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3426 return this.<Boolean> newMasterCaller() 3427 .action( 3428 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3429 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3430 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3431 .call(); 3432 } 3433 3434 @Override 3435 public CompletableFuture<Boolean> runCleanerChore() { 3436 return this.<Boolean> newMasterCaller() 3437 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3438 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3439 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3440 (resp) -> resp.getCleanerChoreRan())) 3441 .call(); 3442 } 3443 3444 @Override 3445 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3446 return this.<Boolean> newMasterCaller() 3447 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3448 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3449 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3450 .call(); 3451 } 3452 3453 @Override 3454 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3455 return this.<Boolean> newMasterCaller() 3456 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3457 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3458 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3459 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3460 .call(); 3461 } 3462 3463 @Override 3464 public CompletableFuture<Integer> runCatalogJanitor() { 3465 return this.<Integer> newMasterCaller() 3466 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3467 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3468 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3469 .call(); 3470 } 3471 3472 @Override 3473 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3474 ServiceCaller<S, R> callable) { 3475 MasterCoprocessorRpcChannelImpl channel = 3476 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3477 S stub = stubMaker.apply(channel); 3478 CompletableFuture<R> future = new CompletableFuture<>(); 3479 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3480 callable.call(stub, controller, resp -> { 3481 if (controller.failed()) { 3482 future.completeExceptionally(controller.getFailed()); 3483 } else { 3484 future.complete(resp); 3485 } 3486 }); 3487 return future; 3488 } 3489 3490 @Override 3491 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3492 ServiceCaller<S, R> callable, ServerName serverName) { 3493 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3494 this.<Message> newServerCaller().serverName(serverName)); 3495 S stub = stubMaker.apply(channel); 3496 CompletableFuture<R> future = new CompletableFuture<>(); 3497 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3498 callable.call(stub, controller, resp -> { 3499 if (controller.failed()) { 3500 future.completeExceptionally(controller.getFailed()); 3501 } else { 3502 future.complete(resp); 3503 } 3504 }); 3505 return future; 3506 } 3507 3508 @Override 3509 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3510 return this.<List<ServerName>> newMasterCaller() 3511 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3512 List<ServerName>> call(controller, stub, 3513 RequestConverter.buildClearDeadServersRequest(servers), 3514 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3515 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3516 .call(); 3517 } 3518 3519 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3520 return this.connection.callerFactory.<T> serverRequest() 3521 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3522 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3523 .pause(pauseNs, TimeUnit.NANOSECONDS) 3524 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3525 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3526 } 3527 3528 @Override 3529 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3530 if (tableName == null) { 3531 return failedFuture(new IllegalArgumentException("Table name is null")); 3532 } 3533 CompletableFuture<Void> future = new CompletableFuture<>(); 3534 addListener(tableExists(tableName), (exist, err) -> { 3535 if (err != null) { 3536 future.completeExceptionally(err); 3537 return; 3538 } 3539 if (!exist) { 3540 future.completeExceptionally(new TableNotFoundException( 3541 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3542 return; 3543 } 3544 addListener(getTableSplits(tableName), (splits, err1) -> { 3545 if (err1 != null) { 3546 future.completeExceptionally(err1); 3547 } else { 3548 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3549 if (err2 != null) { 3550 future.completeExceptionally(err2); 3551 } else { 3552 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3553 if (err3 != null) { 3554 future.completeExceptionally(err3); 3555 } else { 3556 future.complete(result3); 3557 } 3558 }); 3559 } 3560 }); 3561 } 3562 }); 3563 }); 3564 return future; 3565 } 3566 3567 @Override 3568 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3569 if (tableName == null) { 3570 return failedFuture(new IllegalArgumentException("Table name is null")); 3571 } 3572 CompletableFuture<Void> future = new CompletableFuture<>(); 3573 addListener(tableExists(tableName), (exist, err) -> { 3574 if (err != null) { 3575 future.completeExceptionally(err); 3576 return; 3577 } 3578 if (!exist) { 3579 future.completeExceptionally(new TableNotFoundException( 3580 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3581 return; 3582 } 3583 addListener(setTableReplication(tableName, false), (result, err2) -> { 3584 if (err2 != null) { 3585 future.completeExceptionally(err2); 3586 } else { 3587 future.complete(result); 3588 } 3589 }); 3590 }); 3591 return future; 3592 } 3593 3594 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3595 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3596 addListener( 3597 getRegions(tableName).thenApply(regions -> regions.stream() 3598 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3599 (regions, err2) -> { 3600 if (err2 != null) { 3601 future.completeExceptionally(err2); 3602 return; 3603 } 3604 if (regions.size() == 1) { 3605 future.complete(null); 3606 } else { 3607 byte[][] splits = new byte[regions.size() - 1][]; 3608 for (int i = 1; i < regions.size(); i++) { 3609 splits[i - 1] = regions.get(i).getStartKey(); 3610 } 3611 future.complete(splits); 3612 } 3613 }); 3614 return future; 3615 } 3616 3617 /** 3618 * Connect to peer and check the table descriptor on peer: 3619 * <ol> 3620 * <li>Create the same table on peer when not exist.</li> 3621 * <li>Throw an exception if the table already has replication enabled on any of the column 3622 * families.</li> 3623 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3624 * </ol> 3625 * @param tableName name of the table to sync to the peer 3626 * @param splits table split keys 3627 */ 3628 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3629 byte[][] splits) { 3630 CompletableFuture<Void> future = new CompletableFuture<>(); 3631 addListener(listReplicationPeers(), (peers, err) -> { 3632 if (err != null) { 3633 future.completeExceptionally(err); 3634 return; 3635 } 3636 if (peers == null || peers.size() <= 0) { 3637 future.completeExceptionally( 3638 new IllegalArgumentException("Found no peer cluster for replication.")); 3639 return; 3640 } 3641 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3642 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3643 .forEach(peer -> { 3644 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3645 }); 3646 addListener( 3647 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3648 (result, err2) -> { 3649 if (err2 != null) { 3650 future.completeExceptionally(err2); 3651 } else { 3652 future.complete(result); 3653 } 3654 }); 3655 }); 3656 return future; 3657 } 3658 3659 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3660 ReplicationPeerDescription peer) { 3661 Configuration peerConf = null; 3662 try { 3663 peerConf = 3664 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3665 } catch (IOException e) { 3666 return failedFuture(e); 3667 } 3668 CompletableFuture<Void> future = new CompletableFuture<>(); 3669 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3670 if (err != null) { 3671 future.completeExceptionally(err); 3672 return; 3673 } 3674 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3675 if (err1 != null) { 3676 future.completeExceptionally(err1); 3677 return; 3678 } 3679 AsyncAdmin peerAdmin = conn.getAdmin(); 3680 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3681 if (err2 != null) { 3682 future.completeExceptionally(err2); 3683 return; 3684 } 3685 if (!exist) { 3686 CompletableFuture<Void> createTableFuture = null; 3687 if (splits == null) { 3688 createTableFuture = peerAdmin.createTable(tableDesc); 3689 } else { 3690 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3691 } 3692 addListener(createTableFuture, (result, err3) -> { 3693 if (err3 != null) { 3694 future.completeExceptionally(err3); 3695 } else { 3696 future.complete(result); 3697 } 3698 }); 3699 } else { 3700 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3701 (result, err4) -> { 3702 if (err4 != null) { 3703 future.completeExceptionally(err4); 3704 } else { 3705 future.complete(result); 3706 } 3707 }); 3708 } 3709 }); 3710 }); 3711 }); 3712 return future; 3713 } 3714 3715 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3716 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3717 CompletableFuture<Void> future = new CompletableFuture<>(); 3718 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3719 if (err != null) { 3720 future.completeExceptionally(err); 3721 return; 3722 } 3723 if (peerTableDesc == null) { 3724 future.completeExceptionally( 3725 new IllegalArgumentException("Failed to get table descriptor for table " 3726 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3727 return; 3728 } 3729 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3730 future.completeExceptionally(new IllegalArgumentException( 3731 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3732 + ", but the table descriptors are not same when compared with source cluster." 3733 + " Thus can not enable the table's replication switch.")); 3734 return; 3735 } 3736 future.complete(null); 3737 }); 3738 return future; 3739 } 3740 3741 /** 3742 * Set the table's replication switch if the table's replication switch is already not set. 3743 * @param tableName name of the table 3744 * @param enableRep is replication switch enable or disable 3745 */ 3746 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3747 CompletableFuture<Void> future = new CompletableFuture<>(); 3748 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3749 if (err != null) { 3750 future.completeExceptionally(err); 3751 return; 3752 } 3753 if (!tableDesc.matchReplicationScope(enableRep)) { 3754 int scope = 3755 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3756 TableDescriptor newTableDesc = 3757 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3758 addListener(modifyTable(newTableDesc), (result, err2) -> { 3759 if (err2 != null) { 3760 future.completeExceptionally(err2); 3761 } else { 3762 future.complete(result); 3763 } 3764 }); 3765 } else { 3766 future.complete(null); 3767 } 3768 }); 3769 return future; 3770 } 3771 3772 @Override 3773 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 3774 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 3775 request.setPeerId(peerId); 3776 return this.<Boolean> newMasterCaller() 3777 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 3778 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 3779 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 3780 resp -> resp.getIsEnabled())) 3781 .call(); 3782 } 3783 3784 private void waitUntilAllReplicationPeerModificationProceduresDone( 3785 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 3786 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 3787 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 3788 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 3789 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 3790 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 3791 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 3792 resp -> resp.getProcedureList())) 3793 .call(); 3794 addListener(callFuture, (r, e) -> { 3795 if (e != null) { 3796 future.completeExceptionally(e); 3797 } else if (r.isEmpty()) { 3798 // we are done 3799 future.complete(prevOn); 3800 } else { 3801 // retry later to see if the procedures are done 3802 retryTimer.newTimeout( 3803 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 3804 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3805 } 3806 }); 3807 } 3808 3809 @Override 3810 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 3811 boolean drainProcedures) { 3812 ReplicationPeerModificationSwitchRequest request = 3813 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 3814 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 3815 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 3816 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 3817 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 3818 resp -> resp.getPreviousValue())) 3819 .call(); 3820 // if we do not need to wait all previous peer modification procedure done, or we are enabling 3821 // peer modification, just return here. 3822 if (!drainProcedures || on) { 3823 return callFuture; 3824 } 3825 // otherwise we need to wait until all previous peer modification procedure done 3826 CompletableFuture<Boolean> future = new CompletableFuture<>(); 3827 addListener(callFuture, (prevOn, err) -> { 3828 if (err != null) { 3829 future.completeExceptionally(err); 3830 return; 3831 } 3832 // even if the previous state is disabled, we still need to wait here, as there could be 3833 // another client thread which called this method just before us and have already changed the 3834 // state to off, but there are still peer modification procedures not finished, so we should 3835 // also wait here. 3836 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 3837 }); 3838 return future; 3839 } 3840 3841 @Override 3842 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 3843 return this.<Boolean> newMasterCaller() 3844 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 3845 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 3846 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 3847 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 3848 (resp) -> resp.getEnabled())) 3849 .call(); 3850 } 3851 3852 @Override 3853 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3854 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3855 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3856 if (err != null) { 3857 future.completeExceptionally(err); 3858 return; 3859 } 3860 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3861 locations.stream().filter(l -> l.getRegion() != null) 3862 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3863 .collect(Collectors.groupingBy(l -> l.getServerName(), 3864 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3865 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3866 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3867 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3868 futures 3869 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3870 if (err2 != null) { 3871 future.completeExceptionally(unwrapCompletionException(err2)); 3872 } else { 3873 aggregator.append(stats); 3874 } 3875 })); 3876 } 3877 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3878 (ret, err3) -> { 3879 if (err3 != null) { 3880 future.completeExceptionally(unwrapCompletionException(err3)); 3881 } else { 3882 future.complete(aggregator.sum()); 3883 } 3884 }); 3885 }); 3886 return future; 3887 } 3888 3889 @Override 3890 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3891 boolean preserveSplits) { 3892 CompletableFuture<Void> future = new CompletableFuture<>(); 3893 addListener(tableExists(tableName), (exist, err) -> { 3894 if (err != null) { 3895 future.completeExceptionally(err); 3896 return; 3897 } 3898 if (!exist) { 3899 future.completeExceptionally(new TableNotFoundException(tableName)); 3900 return; 3901 } 3902 addListener(tableExists(newTableName), (exist1, err1) -> { 3903 if (err1 != null) { 3904 future.completeExceptionally(err1); 3905 return; 3906 } 3907 if (exist1) { 3908 future.completeExceptionally(new TableExistsException(newTableName)); 3909 return; 3910 } 3911 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3912 if (err2 != null) { 3913 future.completeExceptionally(err2); 3914 return; 3915 } 3916 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3917 if (preserveSplits) { 3918 addListener(getTableSplits(tableName), (splits, err3) -> { 3919 if (err3 != null) { 3920 future.completeExceptionally(err3); 3921 } else { 3922 addListener( 3923 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3924 (result, err4) -> { 3925 if (err4 != null) { 3926 future.completeExceptionally(err4); 3927 } else { 3928 future.complete(result); 3929 } 3930 }); 3931 } 3932 }); 3933 } else { 3934 addListener(createTable(newTableDesc), (result, err5) -> { 3935 if (err5 != null) { 3936 future.completeExceptionally(err5); 3937 } else { 3938 future.complete(result); 3939 } 3940 }); 3941 } 3942 }); 3943 }); 3944 }); 3945 return future; 3946 } 3947 3948 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3949 List<RegionInfo> hris) { 3950 return this.<CacheEvictionStats> newAdminCaller() 3951 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 3952 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 3953 RequestConverter.buildClearRegionBlockCacheRequest(hris), 3954 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3955 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3956 .serverName(serverName).call(); 3957 } 3958 3959 @Override 3960 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 3961 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3962 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 3963 Boolean> call(controller, stub, 3964 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 3965 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 3966 resp -> resp.getPreviousRpcThrottleEnabled())) 3967 .call(); 3968 return future; 3969 } 3970 3971 @Override 3972 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 3973 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3974 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 3975 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 3976 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 3977 resp -> resp.getRpcThrottleEnabled())) 3978 .call(); 3979 return future; 3980 } 3981 3982 @Override 3983 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 3984 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3985 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 3986 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 3987 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 3988 .build(), 3989 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 3990 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 3991 .call(); 3992 return future; 3993 } 3994 3995 @Override 3996 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 3997 return this.<Map<TableName, Long>> newMasterCaller() 3998 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 3999 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4000 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4001 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4002 resp -> resp.getSizesList().stream().collect(Collectors 4003 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4004 .call(); 4005 } 4006 4007 @Override 4008 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4009 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4010 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4011 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4012 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4013 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4014 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4015 resp -> resp.getSnapshotsList().stream() 4016 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4017 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4018 .serverName(serverName).call(); 4019 } 4020 4021 private CompletableFuture<SpaceQuotaSnapshot> 4022 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4023 return this.<SpaceQuotaSnapshot> newMasterCaller() 4024 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4025 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4026 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4027 .call(); 4028 } 4029 4030 @Override 4031 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4032 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4033 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4034 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4035 } 4036 4037 @Override 4038 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4039 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4040 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4041 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4042 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4043 } 4044 4045 @Override 4046 public CompletableFuture<Void> grant(UserPermission userPermission, 4047 boolean mergeExistingPermissions) { 4048 return this.<Void> newMasterCaller() 4049 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4050 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4051 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4052 .call(); 4053 } 4054 4055 @Override 4056 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4057 return this.<Void> newMasterCaller() 4058 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4059 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4060 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4061 .call(); 4062 } 4063 4064 @Override 4065 public CompletableFuture<List<UserPermission>> 4066 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4067 return this.<List<UserPermission>> newMasterCaller() 4068 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4069 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4070 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4071 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4072 resp -> resp.getUserPermissionList().stream() 4073 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4074 .collect(Collectors.toList()))) 4075 .call(); 4076 } 4077 4078 @Override 4079 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4080 List<Permission> permissions) { 4081 return this.<List<Boolean>> newMasterCaller() 4082 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4083 List<Boolean>> call(controller, stub, 4084 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4085 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4086 resp -> resp.getHasUserPermissionList())) 4087 .call(); 4088 } 4089 4090 @Override 4091 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4092 return this.<Boolean> newMasterCaller() 4093 .action((controller, stub) -> this.call(controller, stub, 4094 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4095 MasterService.Interface::switchSnapshotCleanup, 4096 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4097 .call(); 4098 } 4099 4100 @Override 4101 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4102 return this.<Boolean> newMasterCaller() 4103 .action((controller, stub) -> this.call(controller, stub, 4104 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4105 MasterService.Interface::isSnapshotCleanupEnabled, 4106 IsSnapshotCleanupEnabledResponse::getEnabled)) 4107 .call(); 4108 } 4109 4110 @Override 4111 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4112 return this.<Void> newMasterCaller() 4113 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4114 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4115 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4116 .call(); 4117 } 4118 4119 @Override 4120 public CompletableFuture<Void> addRSGroup(String groupName) { 4121 return this.<Void> newMasterCaller() 4122 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4123 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4124 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4125 .call(); 4126 } 4127 4128 @Override 4129 public CompletableFuture<Void> removeRSGroup(String groupName) { 4130 return this.<Void> newMasterCaller() 4131 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4132 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4133 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4134 .call(); 4135 } 4136 4137 @Override 4138 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4139 BalanceRequest request) { 4140 return this.<BalanceResponse> newMasterCaller() 4141 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4142 BalanceResponse> call(controller, stub, 4143 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4144 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4145 .call(); 4146 } 4147 4148 @Override 4149 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4150 return this.<List<RSGroupInfo>> newMasterCaller() 4151 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4152 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4153 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4154 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4155 .call(); 4156 } 4157 4158 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4159 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4160 final String logType) { 4161 if (CollectionUtils.isEmpty(serverNames)) { 4162 return CompletableFuture.completedFuture(Collections.emptyList()); 4163 } 4164 return CompletableFuture.supplyAsync(() -> serverNames 4165 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4166 filterParams, limit, logType)) 4167 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4168 } 4169 4170 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4171 Map<String, Object> filterParams, int limit, String logType) { 4172 return this.<List<LogEntry>> newAdminCaller() 4173 .action((controller, stub) -> this.adminCall(controller, stub, 4174 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4175 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4176 .serverName(serverName).call(); 4177 } 4178 4179 @Override 4180 public CompletableFuture<List<Boolean>> 4181 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4182 if (CollectionUtils.isEmpty(serverNames)) { 4183 return CompletableFuture.completedFuture(Collections.emptyList()); 4184 } 4185 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4186 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4187 return convertToFutureOfList(clearSlowLogResponseList); 4188 } 4189 4190 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4191 return this.<Boolean> newAdminCaller() 4192 .action((controller, stub) -> this.adminCall(controller, stub, 4193 RequestConverter.buildClearSlowLogResponseRequest(), 4194 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4195 .serverName(serverName).call(); 4196 } 4197 4198 private static <T> CompletableFuture<List<T>> 4199 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4200 CompletableFuture<Void> allDoneFuture = 4201 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4202 return allDoneFuture 4203 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4204 } 4205 4206 @Override 4207 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4208 return this.<List<TableName>> newMasterCaller() 4209 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4210 List<TableName>> call(controller, stub, 4211 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4212 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4213 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4214 .call(); 4215 } 4216 4217 @Override 4218 public CompletableFuture<Pair<List<String>, List<TableName>>> 4219 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4220 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4221 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4222 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4223 Pair<List<String>, List<TableName>>> call(controller, stub, 4224 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4225 .build(), 4226 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4227 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4228 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4229 .call(); 4230 } 4231 4232 @Override 4233 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4234 return this.<RSGroupInfo> newMasterCaller() 4235 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4236 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4237 GetRSGroupInfoOfServerRequest.newBuilder() 4238 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4239 .setPort(hostPort.getPort()).build()) 4240 .build(), 4241 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4242 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4243 .call(); 4244 } 4245 4246 @Override 4247 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4248 return this.<Void> newMasterCaller() 4249 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4250 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4251 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4252 .call(); 4253 } 4254 4255 @Override 4256 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4257 CompletableFuture<Void> future = new CompletableFuture<>(); 4258 for (TableName tableName : tables) { 4259 addListener(tableExists(tableName), (exist, err) -> { 4260 if (err != null) { 4261 future.completeExceptionally(err); 4262 return; 4263 } 4264 if (!exist) { 4265 future.completeExceptionally(new TableNotFoundException(tableName)); 4266 return; 4267 } 4268 }); 4269 } 4270 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4271 if (err != null) { 4272 future.completeExceptionally(err); 4273 return; 4274 } 4275 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4276 future.complete(null); 4277 return; 4278 } 4279 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4280 for (TableDescriptor td : tableDescriptions) { 4281 newTableDescriptors 4282 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4283 } 4284 addListener( 4285 CompletableFuture.allOf( 4286 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4287 (v, e) -> { 4288 if (e != null) { 4289 future.completeExceptionally(e); 4290 } else { 4291 future.complete(v); 4292 } 4293 }); 4294 }); 4295 return future; 4296 } 4297 4298 @Override 4299 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4300 return this.<RSGroupInfo> newMasterCaller() 4301 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4302 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4303 GetRSGroupInfoOfTableRequest.newBuilder() 4304 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4305 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4306 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4307 .call(); 4308 } 4309 4310 @Override 4311 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4312 return this.<RSGroupInfo> newMasterCaller() 4313 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4314 RSGroupInfo> call(controller, stub, 4315 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4316 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4317 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4318 .call(); 4319 } 4320 4321 @Override 4322 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4323 return this.<Void> newMasterCaller() 4324 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4325 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4326 .setNewRsgroupName(newName).build(), 4327 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4328 .call(); 4329 } 4330 4331 @Override 4332 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4333 Map<String, String> configuration) { 4334 UpdateRSGroupConfigRequest.Builder request = 4335 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4336 if (configuration != null) { 4337 configuration.entrySet().forEach(e -> request.addConfiguration( 4338 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4339 } 4340 return this.<Void> newMasterCaller() 4341 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4342 Void> call(controller, stub, request.build(), 4343 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4344 .call(); 4345 } 4346 4347 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4348 return this.<List<LogEntry>> newMasterCaller() 4349 .action((controller, stub) -> this.call(controller, stub, 4350 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4351 ProtobufUtil::toBalancerDecisionResponse)) 4352 .call(); 4353 } 4354 4355 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4356 return this.<List<LogEntry>> newMasterCaller() 4357 .action((controller, stub) -> this.call(controller, stub, 4358 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4359 ProtobufUtil::toBalancerRejectionResponse)) 4360 .call(); 4361 } 4362 4363 @Override 4364 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4365 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4366 if (logType == null || serverType == null) { 4367 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4368 } 4369 switch (logType) { 4370 case "SLOW_LOG": 4371 case "LARGE_LOG": 4372 if (ServerType.MASTER.equals(serverType)) { 4373 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4374 } 4375 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4376 case "BALANCER_DECISION": 4377 if (ServerType.REGION_SERVER.equals(serverType)) { 4378 throw new IllegalArgumentException( 4379 "Balancer Decision logs are not maintained by HRegionServer"); 4380 } 4381 return getBalancerDecisions(limit); 4382 case "BALANCER_REJECTION": 4383 if (ServerType.REGION_SERVER.equals(serverType)) { 4384 throw new IllegalArgumentException( 4385 "Balancer Rejection logs are not maintained by HRegionServer"); 4386 } 4387 return getBalancerRejections(limit); 4388 default: 4389 return CompletableFuture.completedFuture(Collections.emptyList()); 4390 } 4391 } 4392 4393 @Override 4394 public CompletableFuture<Void> flushMasterStore() { 4395 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4396 return this.<Void> newMasterCaller() 4397 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4398 Void> call(controller, stub, request.build(), 4399 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4400 .call(); 4401 } 4402}