001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicReference; 041import java.util.function.BiConsumer; 042import java.util.function.Consumer; 043import java.util.function.Function; 044import java.util.function.Supplier; 045import java.util.regex.Pattern; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.CacheEvictionStats; 050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 051import org.apache.hadoop.hbase.CatalogFamilyFormat; 052import org.apache.hadoop.hbase.ClientMetaTableAccessor; 053import org.apache.hadoop.hbase.ClusterMetrics; 054import org.apache.hadoop.hbase.ClusterMetrics.Option; 055import org.apache.hadoop.hbase.ClusterMetricsBuilder; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionLocation; 059import org.apache.hadoop.hbase.NamespaceDescriptor; 060import org.apache.hadoop.hbase.RegionLocations; 061import org.apache.hadoop.hbase.RegionMetrics; 062import org.apache.hadoop.hbase.RegionMetricsBuilder; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableExistsException; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.TableNotDisabledException; 067import org.apache.hadoop.hbase.TableNotEnabledException; 068import org.apache.hadoop.hbase.TableNotFoundException; 069import org.apache.hadoop.hbase.UnknownRegionException; 070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.Scan.ReadType; 074import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 075import org.apache.hadoop.hbase.client.replication.TableCFs; 076import org.apache.hadoop.hbase.client.security.SecurityCapability; 077import org.apache.hadoop.hbase.exceptions.DeserializationException; 078import org.apache.hadoop.hbase.ipc.HBaseRpcController; 079import org.apache.hadoop.hbase.net.Address; 080import org.apache.hadoop.hbase.quotas.QuotaFilter; 081import org.apache.hadoop.hbase.quotas.QuotaSettings; 082import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 083import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 084import org.apache.hadoop.hbase.replication.ReplicationException; 085import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 086import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 087import org.apache.hadoop.hbase.replication.SyncReplicationState; 088import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 090import org.apache.hadoop.hbase.security.access.Permission; 091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 092import org.apache.hadoop.hbase.security.access.UserPermission; 093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 096import org.apache.hadoop.hbase.util.Bytes; 097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 098import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 099import org.apache.hadoop.hbase.util.Pair; 100import org.apache.hadoop.hbase.util.Strings; 101import org.apache.yetus.audience.InterfaceAudience; 102import org.slf4j.Logger; 103import org.slf4j.LoggerFactory; 104 105import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 106import org.apache.hbase.thirdparty.com.google.protobuf.Message; 107import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 108import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 109import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 110import org.apache.hbase.thirdparty.io.netty.util.Timeout; 111import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 112import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 113 114import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 115import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 355import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 356 357/** 358 * The implementation of AsyncAdmin. 359 * <p> 360 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 361 * be finished inside the rpc framework thread, which means that the callbacks registered to the 362 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 363 * this class should not try to do time consuming tasks in the callbacks. 364 * @since 2.0.0 365 * @see AsyncHBaseAdmin 366 * @see AsyncConnection#getAdmin() 367 * @see AsyncConnection#getAdminBuilder() 368 */ 369@InterfaceAudience.Private 370class RawAsyncHBaseAdmin implements AsyncAdmin { 371 372 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 373 374 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 375 376 private final AsyncConnectionImpl connection; 377 378 private final HashedWheelTimer retryTimer; 379 380 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 381 382 private final long rpcTimeoutNs; 383 384 private final long operationTimeoutNs; 385 386 private final long pauseNs; 387 388 private final long pauseNsForServerOverloaded; 389 390 private final int maxAttempts; 391 392 private final int startLogErrorsCnt; 393 394 private final NonceGenerator ng; 395 396 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 397 AsyncAdminBuilderBase builder) { 398 this.connection = connection; 399 this.retryTimer = retryTimer; 400 this.metaTable = connection.getTable(META_TABLE_NAME); 401 this.rpcTimeoutNs = builder.rpcTimeoutNs; 402 this.operationTimeoutNs = builder.operationTimeoutNs; 403 this.pauseNs = builder.pauseNs; 404 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 405 LOG.warn( 406 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 407 + " the normal pause value {} ms, use the greater one instead", 408 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 409 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 410 this.pauseNsForServerOverloaded = builder.pauseNs; 411 } else { 412 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 413 } 414 this.maxAttempts = builder.maxAttempts; 415 this.startLogErrorsCnt = builder.startLogErrorsCnt; 416 this.ng = connection.getNonceGenerator(); 417 } 418 419 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 420 return this.connection.callerFactory.<T> masterRequest() 421 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 422 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 423 .pause(pauseNs, TimeUnit.NANOSECONDS) 424 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 425 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 426 } 427 428 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 429 return this.connection.callerFactory.<T> adminRequest() 430 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 431 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 432 .pause(pauseNs, TimeUnit.NANOSECONDS) 433 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 434 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 435 } 436 437 @FunctionalInterface 438 private interface MasterRpcCall<RESP, REQ> { 439 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 440 RpcCallback<RESP> done); 441 } 442 443 @FunctionalInterface 444 private interface AdminRpcCall<RESP, REQ> { 445 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 446 RpcCallback<RESP> done); 447 } 448 449 @FunctionalInterface 450 private interface Converter<D, S> { 451 D convert(S src) throws IOException; 452 } 453 454 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 455 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 456 Converter<RESP, PRESP> respConverter) { 457 CompletableFuture<RESP> future = new CompletableFuture<>(); 458 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 459 460 @Override 461 public void run(PRESP resp) { 462 if (controller.failed()) { 463 future.completeExceptionally(controller.getFailed()); 464 } else { 465 try { 466 future.complete(respConverter.convert(resp)); 467 } catch (IOException e) { 468 future.completeExceptionally(e); 469 } 470 } 471 } 472 }); 473 return future; 474 } 475 476 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 477 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 478 Converter<RESP, PRESP> respConverter) { 479 CompletableFuture<RESP> future = new CompletableFuture<>(); 480 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 481 482 @Override 483 public void run(PRESP resp) { 484 if (controller.failed()) { 485 future.completeExceptionally(controller.getFailed()); 486 } else { 487 try { 488 future.complete(respConverter.convert(resp)); 489 } catch (IOException e) { 490 future.completeExceptionally(e); 491 } 492 } 493 } 494 }); 495 return future; 496 } 497 498 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 499 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 500 ProcedureBiConsumer consumer) { 501 return procedureCall(b -> { 502 }, preq, rpcCall, respConverter, consumer); 503 } 504 505 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 506 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 507 ProcedureBiConsumer consumer) { 508 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 509 } 510 511 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 512 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 513 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 514 ProcedureBiConsumer consumer) { 515 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 516 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 517 prioritySetter.accept(builder); 518 CompletableFuture<Long> procFuture = builder.call(); 519 CompletableFuture<Void> future = waitProcedureResult(procFuture); 520 addListener(future, consumer); 521 return future; 522 } 523 524 @Override 525 public CompletableFuture<Boolean> tableExists(TableName tableName) { 526 if (TableName.isMetaTableName(tableName)) { 527 return CompletableFuture.completedFuture(true); 528 } 529 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 530 } 531 532 @Override 533 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 534 return getTableDescriptors( 535 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 536 } 537 538 /** 539 * {@link #listTableDescriptors(boolean)} 540 */ 541 @Override 542 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 543 boolean includeSysTables) { 544 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 545 + "use listTableDescriptors(boolean) instead"); 546 return getTableDescriptors( 547 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 548 } 549 550 @Override 551 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 552 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 553 + "use listTableDescriptors(boolean) instead"); 554 if (tableNames.isEmpty()) { 555 return CompletableFuture.completedFuture(Collections.emptyList()); 556 } 557 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 558 } 559 560 private CompletableFuture<List<TableDescriptor>> 561 getTableDescriptors(GetTableDescriptorsRequest request) { 562 return this.<List<TableDescriptor>> newMasterCaller() 563 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 564 List<TableDescriptor>> call(controller, stub, request, 565 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 566 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 567 .call(); 568 } 569 570 @Override 571 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 572 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 573 } 574 575 @Override 576 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 577 boolean includeSysTables) { 578 Preconditions.checkNotNull(pattern, 579 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 580 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 581 } 582 583 @Override 584 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 585 return this.<List<TableName>> newMasterCaller() 586 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 587 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 588 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 589 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 590 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 591 .call(); 592 } 593 594 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 595 return this.<List<TableName>> newMasterCaller() 596 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 597 List<TableName>> call(controller, stub, request, 598 (s, c, req, done) -> s.getTableNames(c, req, done), 599 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 600 .call(); 601 } 602 603 @Override 604 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 605 return this.<List<TableDescriptor>> newMasterCaller() 606 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 607 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 608 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 609 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 610 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 611 .call(); 612 } 613 614 @Override 615 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 616 return this.<List<TableDescriptor>> newMasterCaller() 617 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 618 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 619 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 620 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 621 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 622 .call(); 623 } 624 625 @Override 626 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 627 return this.<List<TableName>> newMasterCaller() 628 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 629 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 630 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 631 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 632 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 633 .call(); 634 } 635 636 @Override 637 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 638 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 639 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 640 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 641 List<TableSchema>> call(controller, stub, 642 RequestConverter.buildGetTableDescriptorsRequest(tableName), 643 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 644 (resp) -> resp.getTableSchemaList())) 645 .call(), (tableSchemas, error) -> { 646 if (error != null) { 647 future.completeExceptionally(error); 648 return; 649 } 650 if (!tableSchemas.isEmpty()) { 651 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 652 } else { 653 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 654 } 655 }); 656 return future; 657 } 658 659 @Override 660 public CompletableFuture<Void> createTable(TableDescriptor desc) { 661 return createTable(desc.getTableName(), 662 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 663 } 664 665 @Override 666 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 667 int numRegions) { 668 try { 669 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 670 } catch (IllegalArgumentException e) { 671 return failedFuture(e); 672 } 673 } 674 675 @Override 676 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 677 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 678 + " use createTable(TableDescriptor) instead"); 679 try { 680 verifySplitKeys(splitKeys); 681 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 682 splitKeys, ng.getNonceGroup(), ng.newNonce())); 683 } catch (IllegalArgumentException e) { 684 return failedFuture(e); 685 } 686 } 687 688 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 689 Preconditions.checkNotNull(tableName, "table name is null"); 690 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 691 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 692 new CreateTableProcedureBiConsumer(tableName)); 693 } 694 695 @Override 696 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 697 return modifyTable(desc, true); 698 } 699 700 @Override 701 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 702 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 703 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 704 ng.newNonce(), reopenRegions), 705 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 706 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 707 } 708 709 @Override 710 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 711 return this.<ModifyTableStoreFileTrackerRequest, 712 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 713 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 714 ng.getNonceGroup(), ng.newNonce()), 715 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 716 (resp) -> resp.getProcId(), 717 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 718 } 719 720 @Override 721 public CompletableFuture<Void> deleteTable(TableName tableName) { 722 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 723 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 724 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 725 new DeleteTableProcedureBiConsumer(tableName)); 726 } 727 728 @Override 729 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 730 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 731 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 732 ng.newNonce()), 733 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 734 new TruncateTableProcedureBiConsumer(tableName)); 735 } 736 737 @Override 738 public CompletableFuture<Void> enableTable(TableName tableName) { 739 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 740 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 741 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 742 new EnableTableProcedureBiConsumer(tableName)); 743 } 744 745 @Override 746 public CompletableFuture<Void> disableTable(TableName tableName) { 747 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 748 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 749 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 750 new DisableTableProcedureBiConsumer(tableName)); 751 } 752 753 /** 754 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 755 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 756 * targetState). 757 */ 758 private static CompletableFuture<Boolean> completeCheckTableState( 759 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 760 TableState.State targetState, TableName tableName) { 761 if (error != null) { 762 future.completeExceptionally(error); 763 } else { 764 if (tableState != null) { 765 future.complete(tableState.inStates(targetState)); 766 } else { 767 future.completeExceptionally(new TableNotFoundException(tableName)); 768 } 769 } 770 return future; 771 } 772 773 @Override 774 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 775 if (TableName.isMetaTableName(tableName)) { 776 return CompletableFuture.completedFuture(true); 777 } 778 CompletableFuture<Boolean> future = new CompletableFuture<>(); 779 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 780 (tableState, error) -> { 781 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 782 TableState.State.ENABLED, tableName); 783 }); 784 return future; 785 } 786 787 @Override 788 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 789 if (TableName.isMetaTableName(tableName)) { 790 return CompletableFuture.completedFuture(false); 791 } 792 CompletableFuture<Boolean> future = new CompletableFuture<>(); 793 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 794 (tableState, error) -> { 795 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 796 TableState.State.DISABLED, tableName); 797 }); 798 return future; 799 } 800 801 @Override 802 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 803 if (TableName.isMetaTableName(tableName)) { 804 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 805 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 806 } 807 CompletableFuture<Boolean> future = new CompletableFuture<>(); 808 addListener(isTableEnabled(tableName), (enabled, error) -> { 809 if (error != null) { 810 if (error instanceof TableNotFoundException) { 811 future.complete(false); 812 } else { 813 future.completeExceptionally(error); 814 } 815 return; 816 } 817 if (!enabled) { 818 future.complete(false); 819 } else { 820 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 821 (locations, error1) -> { 822 if (error1 != null) { 823 future.completeExceptionally(error1); 824 return; 825 } 826 List<HRegionLocation> notDeployedRegions = locations.stream() 827 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 828 if (notDeployedRegions.size() > 0) { 829 if (LOG.isDebugEnabled()) { 830 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 831 } 832 future.complete(false); 833 return; 834 } 835 future.complete(true); 836 }); 837 } 838 }); 839 return future; 840 } 841 842 @Override 843 public CompletableFuture<Void> addColumnFamily(TableName tableName, 844 ColumnFamilyDescriptor columnFamily) { 845 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 846 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 847 ng.newNonce()), 848 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 849 new AddColumnFamilyProcedureBiConsumer(tableName)); 850 } 851 852 @Override 853 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 854 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 855 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 856 ng.newNonce()), 857 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 858 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 859 } 860 861 @Override 862 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 863 ColumnFamilyDescriptor columnFamily) { 864 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 865 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 866 ng.newNonce()), 867 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 868 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 869 } 870 871 @Override 872 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 873 byte[] family, String dstSFT) { 874 return this.<ModifyColumnStoreFileTrackerRequest, 875 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 876 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 877 ng.getNonceGroup(), ng.newNonce()), 878 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 879 (resp) -> resp.getProcId(), 880 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 881 } 882 883 @Override 884 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 885 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 886 RequestConverter.buildCreateNamespaceRequest(descriptor), 887 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 888 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 889 } 890 891 @Override 892 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 893 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 894 RequestConverter.buildModifyNamespaceRequest(descriptor), 895 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 896 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 897 } 898 899 @Override 900 public CompletableFuture<Void> deleteNamespace(String name) { 901 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 902 RequestConverter.buildDeleteNamespaceRequest(name), 903 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 904 new DeleteNamespaceProcedureBiConsumer(name)); 905 } 906 907 @Override 908 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 909 return this.<NamespaceDescriptor> newMasterCaller() 910 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 911 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 912 RequestConverter.buildGetNamespaceDescriptorRequest(name), 913 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 914 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 915 .call(); 916 } 917 918 @Override 919 public CompletableFuture<List<String>> listNamespaces() { 920 return this.<List<String>> newMasterCaller() 921 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 922 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 923 (s, c, req, done) -> s.listNamespaces(c, req, done), 924 (resp) -> resp.getNamespaceNameList())) 925 .call(); 926 } 927 928 @Override 929 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 930 return this.<List<NamespaceDescriptor>> newMasterCaller() 931 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 932 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 933 ListNamespaceDescriptorsRequest.newBuilder().build(), 934 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 935 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 936 .call(); 937 } 938 939 @Override 940 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 941 return this.<List<RegionInfo>> newAdminCaller() 942 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 943 List<RegionInfo>> adminCall(controller, stub, 944 RequestConverter.buildGetOnlineRegionRequest(), 945 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 946 resp -> ProtobufUtil.getRegionInfos(resp))) 947 .serverName(serverName).call(); 948 } 949 950 @Override 951 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 952 if (tableName.equals(META_TABLE_NAME)) { 953 return connection.registry.getMetaRegionLocations() 954 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 955 .collect(Collectors.toList())); 956 } else { 957 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 958 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 959 } 960 } 961 962 @Override 963 public CompletableFuture<Void> flush(TableName tableName) { 964 return flush(tableName, Collections.emptyList()); 965 } 966 967 @Override 968 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 969 return flush(tableName, Collections.singletonList(columnFamily)); 970 } 971 972 @Override 973 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 974 // This is for keeping compatibility with old implementation. 975 // If the server version is lower than the client version, it's possible that the 976 // flushTable method is not present in the server side, if so, we need to fall back 977 // to the old implementation. 978 List<byte[]> columnFamilies = columnFamilyList.stream() 979 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 980 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 981 ng.getNonceGroup(), ng.newNonce()); 982 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 983 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 984 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 985 CompletableFuture<Void> future = new CompletableFuture<>(); 986 addListener(procFuture, (ret, error) -> { 987 if (error != null) { 988 if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) { 989 future.completeExceptionally(error); 990 } else if (error instanceof DoNotRetryIOException) { 991 // usually this is caused by the method is not present on the server or 992 // the hbase hadoop version does not match the running hadoop version. 993 // if that happens, we need fall back to the old flush implementation. 994 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 995 legacyFlush(future, tableName, columnFamilies); 996 } else { 997 future.completeExceptionally(error); 998 } 999 } else { 1000 future.complete(ret); 1001 } 1002 }); 1003 return future; 1004 } 1005 1006 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1007 List<byte[]> columnFamilies) { 1008 addListener(tableExists(tableName), (exists, err) -> { 1009 if (err != null) { 1010 future.completeExceptionally(err); 1011 } else if (!exists) { 1012 future.completeExceptionally(new TableNotFoundException(tableName)); 1013 } else { 1014 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1015 if (err2 != null) { 1016 future.completeExceptionally(err2); 1017 } else if (!tableEnabled) { 1018 future.completeExceptionally(new TableNotEnabledException(tableName)); 1019 } else { 1020 Map<String, String> props = new HashMap<>(); 1021 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1022 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1023 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1024 } 1025 addListener( 1026 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1027 (ret, err3) -> { 1028 if (err3 != null) { 1029 future.completeExceptionally(err3); 1030 } else { 1031 future.complete(ret); 1032 } 1033 }); 1034 } 1035 }); 1036 } 1037 }); 1038 } 1039 1040 @Override 1041 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1042 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 1043 }); 1044 } 1045 1046 @Override 1047 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1048 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1049 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1050 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1051 }); 1052 } 1053 1054 /** 1055 * This method is for internal use only, where we need the response of the flush. 1056 * <p/> 1057 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1058 * API. 1059 */ 1060 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1061 boolean writeFlushWALMarker) { 1062 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1063 addListener(getRegionLocation(regionName), (location, err) -> { 1064 if (err != null) { 1065 future.completeExceptionally(err); 1066 return; 1067 } 1068 ServerName serverName = location.getServerName(); 1069 if (serverName == null) { 1070 future 1071 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1072 return; 1073 } 1074 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1075 (ret, err2) -> { 1076 if (err2 != null) { 1077 future.completeExceptionally(err2); 1078 } else { 1079 future.complete(ret); 1080 } 1081 }); 1082 }); 1083 return future; 1084 } 1085 1086 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1087 byte[] columnFamily, boolean writeFlushWALMarker) { 1088 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1089 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1090 FlushRegionResponse> adminCall(controller, stub, 1091 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1092 writeFlushWALMarker), 1093 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1094 .call(); 1095 } 1096 1097 @Override 1098 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1099 CompletableFuture<Void> future = new CompletableFuture<>(); 1100 addListener(getRegions(sn), (hRegionInfos, err) -> { 1101 if (err != null) { 1102 future.completeExceptionally(err); 1103 return; 1104 } 1105 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1106 if (hRegionInfos != null) { 1107 hRegionInfos 1108 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1109 }))); 1110 } 1111 addListener(CompletableFuture.allOf( 1112 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1113 if (err2 != null) { 1114 future.completeExceptionally(err2); 1115 } else { 1116 future.complete(ret); 1117 } 1118 }); 1119 }); 1120 return future; 1121 } 1122 1123 @Override 1124 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1125 return compact(tableName, null, false, compactType); 1126 } 1127 1128 @Override 1129 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1130 CompactType compactType) { 1131 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1132 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1133 return compact(tableName, columnFamily, false, compactType); 1134 } 1135 1136 @Override 1137 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1138 return compactRegion(regionName, null, false); 1139 } 1140 1141 @Override 1142 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1143 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1144 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1145 return compactRegion(regionName, columnFamily, false); 1146 } 1147 1148 @Override 1149 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1150 return compact(tableName, null, true, compactType); 1151 } 1152 1153 @Override 1154 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1155 CompactType compactType) { 1156 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1157 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1158 return compact(tableName, columnFamily, true, compactType); 1159 } 1160 1161 @Override 1162 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1163 return compactRegion(regionName, null, true); 1164 } 1165 1166 @Override 1167 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1168 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1169 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1170 return compactRegion(regionName, columnFamily, true); 1171 } 1172 1173 @Override 1174 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1175 return compactRegionServer(sn, false); 1176 } 1177 1178 @Override 1179 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1180 return compactRegionServer(sn, true); 1181 } 1182 1183 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1184 CompletableFuture<Void> future = new CompletableFuture<>(); 1185 addListener(getRegions(sn), (hRegionInfos, err) -> { 1186 if (err != null) { 1187 future.completeExceptionally(err); 1188 return; 1189 } 1190 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1191 if (hRegionInfos != null) { 1192 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1193 } 1194 addListener(CompletableFuture.allOf( 1195 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1196 if (err2 != null) { 1197 future.completeExceptionally(err2); 1198 } else { 1199 future.complete(ret); 1200 } 1201 }); 1202 }); 1203 return future; 1204 } 1205 1206 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1207 boolean major) { 1208 CompletableFuture<Void> future = new CompletableFuture<>(); 1209 addListener(getRegionLocation(regionName), (location, err) -> { 1210 if (err != null) { 1211 future.completeExceptionally(err); 1212 return; 1213 } 1214 ServerName serverName = location.getServerName(); 1215 if (serverName == null) { 1216 future 1217 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1218 return; 1219 } 1220 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1221 (ret, err2) -> { 1222 if (err2 != null) { 1223 future.completeExceptionally(err2); 1224 } else { 1225 future.complete(ret); 1226 } 1227 }); 1228 }); 1229 return future; 1230 } 1231 1232 /** 1233 * List all region locations for the specific table. 1234 */ 1235 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1236 if (TableName.META_TABLE_NAME.equals(tableName)) { 1237 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1238 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1239 if (err != null) { 1240 future.completeExceptionally(err); 1241 } else if ( 1242 metaRegions == null || metaRegions.isEmpty() 1243 || metaRegions.getDefaultRegionLocation() == null 1244 ) { 1245 future.completeExceptionally(new IOException("meta region does not found")); 1246 } else { 1247 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1248 } 1249 }); 1250 return future; 1251 } else { 1252 // For non-meta table, we fetch all locations by scanning hbase:meta table 1253 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1254 } 1255 } 1256 1257 /** 1258 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1259 */ 1260 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1261 CompactType compactType) { 1262 CompletableFuture<Void> future = new CompletableFuture<>(); 1263 1264 switch (compactType) { 1265 case MOB: 1266 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1267 if (err != null) { 1268 future.completeExceptionally(err); 1269 return; 1270 } 1271 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1272 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1273 if (err2 != null) { 1274 future.completeExceptionally(err2); 1275 } else { 1276 future.complete(ret); 1277 } 1278 }); 1279 }); 1280 break; 1281 case NORMAL: 1282 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1283 if (err != null) { 1284 future.completeExceptionally(err); 1285 return; 1286 } 1287 if (locations == null || locations.isEmpty()) { 1288 future.completeExceptionally(new TableNotFoundException(tableName)); 1289 } 1290 CompletableFuture<?>[] compactFutures = 1291 locations.stream().filter(l -> l.getRegion() != null) 1292 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1293 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1294 .toArray(CompletableFuture<?>[]::new); 1295 // future complete unless all of the compact futures are completed. 1296 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1297 if (err2 != null) { 1298 future.completeExceptionally(err2); 1299 } else { 1300 future.complete(ret); 1301 } 1302 }); 1303 }); 1304 break; 1305 default: 1306 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1307 } 1308 return future; 1309 } 1310 1311 /** 1312 * Compact the region at specific region server. 1313 */ 1314 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1315 final boolean major, byte[] columnFamily) { 1316 return this.<Void> newAdminCaller().serverName(sn) 1317 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1318 Void> adminCall(controller, stub, 1319 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1320 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1321 .call(); 1322 } 1323 1324 private byte[] toEncodeRegionName(byte[] regionName) { 1325 return RegionInfo.isEncodedRegionName(regionName) 1326 ? regionName 1327 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1328 } 1329 1330 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1331 CompletableFuture<TableName> result) { 1332 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1333 if (err != null) { 1334 result.completeExceptionally(err); 1335 return; 1336 } 1337 RegionInfo regionInfo = location.getRegion(); 1338 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1339 result.completeExceptionally( 1340 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1341 return; 1342 } 1343 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1344 if (!tableName.get().equals(regionInfo.getTable())) { 1345 // tables of this two region should be same. 1346 result.completeExceptionally( 1347 new IllegalArgumentException("Cannot merge regions from two different tables " 1348 + tableName.get() + " and " + regionInfo.getTable())); 1349 } else { 1350 result.complete(tableName.get()); 1351 } 1352 } 1353 }); 1354 } 1355 1356 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1357 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1358 CompletableFuture<TableName> future = new CompletableFuture<>(); 1359 for (byte[] encodedRegionName : encodedRegionNames) { 1360 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1361 } 1362 return future; 1363 } 1364 1365 @Override 1366 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1367 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1368 } 1369 1370 @Override 1371 public CompletableFuture<Boolean> isMergeEnabled() { 1372 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1373 } 1374 1375 @Override 1376 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1377 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1378 } 1379 1380 @Override 1381 public CompletableFuture<Boolean> isSplitEnabled() { 1382 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1383 } 1384 1385 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1386 MasterSwitchType switchType) { 1387 SetSplitOrMergeEnabledRequest request = 1388 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1389 return this.<Boolean> newMasterCaller() 1390 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1391 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1392 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1393 (resp) -> resp.getPrevValueList().get(0))) 1394 .call(); 1395 } 1396 1397 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1398 IsSplitOrMergeEnabledRequest request = 1399 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1400 return this.<Boolean> newMasterCaller() 1401 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1402 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1403 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1404 .call(); 1405 } 1406 1407 @Override 1408 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1409 if (nameOfRegionsToMerge.size() < 2) { 1410 return failedFuture(new IllegalArgumentException( 1411 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1412 } 1413 CompletableFuture<Void> future = new CompletableFuture<>(); 1414 byte[][] encodedNameOfRegionsToMerge = 1415 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1416 1417 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1418 if (err != null) { 1419 future.completeExceptionally(err); 1420 return; 1421 } 1422 1423 final MergeTableRegionsRequest request; 1424 try { 1425 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1426 forcible, ng.getNonceGroup(), ng.newNonce()); 1427 } catch (DeserializationException e) { 1428 future.completeExceptionally(e); 1429 return; 1430 } 1431 1432 addListener( 1433 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1434 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1435 (ret, err2) -> { 1436 if (err2 != null) { 1437 future.completeExceptionally(err2); 1438 } else { 1439 future.complete(ret); 1440 } 1441 }); 1442 }); 1443 return future; 1444 } 1445 1446 @Override 1447 public CompletableFuture<Void> split(TableName tableName) { 1448 CompletableFuture<Void> future = new CompletableFuture<>(); 1449 addListener(tableExists(tableName), (exist, error) -> { 1450 if (error != null) { 1451 future.completeExceptionally(error); 1452 return; 1453 } 1454 if (!exist) { 1455 future.completeExceptionally(new TableNotFoundException(tableName)); 1456 return; 1457 } 1458 addListener(metaTable 1459 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1460 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1461 ClientMetaTableAccessor.QueryType.REGION)) 1462 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1463 ClientMetaTableAccessor.QueryType.REGION))), 1464 (results, err2) -> { 1465 if (err2 != null) { 1466 future.completeExceptionally(err2); 1467 return; 1468 } 1469 if (results != null && !results.isEmpty()) { 1470 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1471 for (Result r : results) { 1472 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1473 continue; 1474 } 1475 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1476 if (rl != null) { 1477 for (HRegionLocation h : rl.getRegionLocations()) { 1478 if (h != null && h.getServerName() != null) { 1479 RegionInfo hri = h.getRegion(); 1480 if ( 1481 hri == null || hri.isSplitParent() 1482 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1483 ) { 1484 continue; 1485 } 1486 splitFutures.add(split(hri, null)); 1487 } 1488 } 1489 } 1490 } 1491 addListener( 1492 CompletableFuture 1493 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1494 (ret, exception) -> { 1495 if (exception != null) { 1496 future.completeExceptionally(exception); 1497 return; 1498 } 1499 future.complete(ret); 1500 }); 1501 } else { 1502 future.complete(null); 1503 } 1504 }); 1505 }); 1506 return future; 1507 } 1508 1509 @Override 1510 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1511 CompletableFuture<Void> result = new CompletableFuture<>(); 1512 if (splitPoint == null) { 1513 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1514 } 1515 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1516 (loc, err) -> { 1517 if (err != null) { 1518 result.completeExceptionally(err); 1519 } else if (loc == null || loc.getRegion() == null) { 1520 result.completeExceptionally(new IllegalArgumentException( 1521 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1522 } else { 1523 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1524 if (err2 != null) { 1525 result.completeExceptionally(err2); 1526 } else { 1527 result.complete(ret); 1528 } 1529 1530 }); 1531 } 1532 }); 1533 return result; 1534 } 1535 1536 @Override 1537 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1538 CompletableFuture<Void> future = new CompletableFuture<>(); 1539 addListener(getRegionLocation(regionName), (location, err) -> { 1540 if (err != null) { 1541 future.completeExceptionally(err); 1542 return; 1543 } 1544 RegionInfo regionInfo = location.getRegion(); 1545 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1546 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1547 + "Replicas are auto-split when their primary is split.")); 1548 return; 1549 } 1550 ServerName serverName = location.getServerName(); 1551 if (serverName == null) { 1552 future 1553 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1554 return; 1555 } 1556 addListener(split(regionInfo, null), (ret, err2) -> { 1557 if (err2 != null) { 1558 future.completeExceptionally(err2); 1559 } else { 1560 future.complete(ret); 1561 } 1562 }); 1563 }); 1564 return future; 1565 } 1566 1567 @Override 1568 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1569 Preconditions.checkNotNull(splitPoint, 1570 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1571 CompletableFuture<Void> future = new CompletableFuture<>(); 1572 addListener(getRegionLocation(regionName), (location, err) -> { 1573 if (err != null) { 1574 future.completeExceptionally(err); 1575 return; 1576 } 1577 RegionInfo regionInfo = location.getRegion(); 1578 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1579 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1580 + "Replicas are auto-split when their primary is split.")); 1581 return; 1582 } 1583 ServerName serverName = location.getServerName(); 1584 if (serverName == null) { 1585 future 1586 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1587 return; 1588 } 1589 if ( 1590 regionInfo.getStartKey() != null 1591 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1592 ) { 1593 future.completeExceptionally( 1594 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1595 return; 1596 } 1597 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1598 if (err2 != null) { 1599 future.completeExceptionally(err2); 1600 } else { 1601 future.complete(ret); 1602 } 1603 }); 1604 }); 1605 return future; 1606 } 1607 1608 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1609 CompletableFuture<Void> future = new CompletableFuture<>(); 1610 TableName tableName = hri.getTable(); 1611 final SplitTableRegionRequest request; 1612 try { 1613 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1614 ng.newNonce()); 1615 } catch (DeserializationException e) { 1616 future.completeExceptionally(e); 1617 return future; 1618 } 1619 1620 addListener( 1621 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1622 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1623 (ret, err2) -> { 1624 if (err2 != null) { 1625 future.completeExceptionally(err2); 1626 } else { 1627 future.complete(ret); 1628 } 1629 }); 1630 return future; 1631 } 1632 1633 @Override 1634 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1635 CompletableFuture<Void> future = new CompletableFuture<>(); 1636 addListener(getRegionLocation(regionName), (location, err) -> { 1637 if (err != null) { 1638 future.completeExceptionally(err); 1639 return; 1640 } 1641 RegionInfo regionInfo = location.getRegion(); 1642 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1643 future.completeExceptionally(new IllegalArgumentException( 1644 "Can't truncate replicas directly.Replicas are auto-truncated " 1645 + "when their primary is truncated.")); 1646 return; 1647 } 1648 ServerName serverName = location.getServerName(); 1649 if (serverName == null) { 1650 future 1651 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1652 return; 1653 } 1654 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1655 if (err2 != null) { 1656 future.completeExceptionally(err2); 1657 } else { 1658 future.complete(ret); 1659 } 1660 }); 1661 }); 1662 return future; 1663 } 1664 1665 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1666 CompletableFuture<Void> future = new CompletableFuture<>(); 1667 TableName tableName = hri.getTable(); 1668 final MasterProtos.TruncateRegionRequest request; 1669 try { 1670 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1671 } catch (DeserializationException e) { 1672 future.completeExceptionally(e); 1673 return future; 1674 } 1675 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1676 MasterProtos.TruncateRegionResponse::getProcId, 1677 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1678 if (err2 != null) { 1679 future.completeExceptionally(err2); 1680 } else { 1681 future.complete(ret); 1682 } 1683 }); 1684 return future; 1685 } 1686 1687 @Override 1688 public CompletableFuture<Void> assign(byte[] regionName) { 1689 CompletableFuture<Void> future = new CompletableFuture<>(); 1690 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1691 if (err != null) { 1692 future.completeExceptionally(err); 1693 return; 1694 } 1695 addListener( 1696 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1697 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1698 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1699 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1700 .call(), 1701 (ret, err2) -> { 1702 if (err2 != null) { 1703 future.completeExceptionally(err2); 1704 } else { 1705 future.complete(ret); 1706 } 1707 }); 1708 }); 1709 return future; 1710 } 1711 1712 @Override 1713 public CompletableFuture<Void> unassign(byte[] regionName) { 1714 CompletableFuture<Void> future = new CompletableFuture<>(); 1715 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1716 if (err != null) { 1717 future.completeExceptionally(err); 1718 return; 1719 } 1720 addListener( 1721 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1722 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1723 Void> call(controller, stub, 1724 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1725 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1726 .call(), 1727 (ret, err2) -> { 1728 if (err2 != null) { 1729 future.completeExceptionally(err2); 1730 } else { 1731 future.complete(ret); 1732 } 1733 }); 1734 }); 1735 return future; 1736 } 1737 1738 @Override 1739 public CompletableFuture<Void> offline(byte[] regionName) { 1740 CompletableFuture<Void> future = new CompletableFuture<>(); 1741 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1742 if (err != null) { 1743 future.completeExceptionally(err); 1744 return; 1745 } 1746 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1747 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1748 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1749 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1750 .call(), (ret, err2) -> { 1751 if (err2 != null) { 1752 future.completeExceptionally(err2); 1753 } else { 1754 future.complete(ret); 1755 } 1756 }); 1757 }); 1758 return future; 1759 } 1760 1761 @Override 1762 public CompletableFuture<Void> move(byte[] regionName) { 1763 CompletableFuture<Void> future = new CompletableFuture<>(); 1764 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1765 if (err != null) { 1766 future.completeExceptionally(err); 1767 return; 1768 } 1769 addListener( 1770 moveRegion(regionInfo, 1771 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1772 (ret, err2) -> { 1773 if (err2 != null) { 1774 future.completeExceptionally(err2); 1775 } else { 1776 future.complete(ret); 1777 } 1778 }); 1779 }); 1780 return future; 1781 } 1782 1783 @Override 1784 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1785 Preconditions.checkNotNull(destServerName, 1786 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1787 CompletableFuture<Void> future = new CompletableFuture<>(); 1788 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1789 if (err != null) { 1790 future.completeExceptionally(err); 1791 return; 1792 } 1793 addListener( 1794 moveRegion(regionInfo, RequestConverter 1795 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1796 (ret, err2) -> { 1797 if (err2 != null) { 1798 future.completeExceptionally(err2); 1799 } else { 1800 future.complete(ret); 1801 } 1802 }); 1803 }); 1804 return future; 1805 } 1806 1807 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1808 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1809 .action( 1810 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1811 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1812 .call(); 1813 } 1814 1815 @Override 1816 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1817 return this.<Void> newMasterCaller() 1818 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1819 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1820 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1821 .call(); 1822 } 1823 1824 @Override 1825 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1826 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1827 Scan scan = QuotaTableUtil.makeScan(filter); 1828 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1829 new AdvancedScanResultConsumer() { 1830 List<QuotaSettings> settings = new ArrayList<>(); 1831 1832 @Override 1833 public void onNext(Result[] results, ScanController controller) { 1834 for (Result result : results) { 1835 try { 1836 QuotaTableUtil.parseResultToCollection(result, settings); 1837 } catch (IOException e) { 1838 controller.terminate(); 1839 future.completeExceptionally(e); 1840 } 1841 } 1842 } 1843 1844 @Override 1845 public void onError(Throwable error) { 1846 future.completeExceptionally(error); 1847 } 1848 1849 @Override 1850 public void onComplete() { 1851 future.complete(settings); 1852 } 1853 }); 1854 return future; 1855 } 1856 1857 @Override 1858 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1859 boolean enabled) { 1860 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1861 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1862 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1863 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1864 } 1865 1866 @Override 1867 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1868 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1869 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1870 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1871 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1872 } 1873 1874 @Override 1875 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1876 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1877 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1878 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1879 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1880 } 1881 1882 @Override 1883 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1884 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1885 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1886 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1887 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1888 } 1889 1890 @Override 1891 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1892 return this.<ReplicationPeerConfig> newMasterCaller() 1893 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1894 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1895 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1896 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1897 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1898 .call(); 1899 } 1900 1901 @Override 1902 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1903 ReplicationPeerConfig peerConfig) { 1904 return this.<UpdateReplicationPeerConfigRequest, 1905 UpdateReplicationPeerConfigResponse> procedureCall( 1906 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1907 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1908 (resp) -> resp.getProcId(), 1909 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1910 } 1911 1912 @Override 1913 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1914 SyncReplicationState clusterState) { 1915 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1916 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1917 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1918 clusterState), 1919 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1920 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1921 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1922 } 1923 1924 @Override 1925 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1926 Map<TableName, List<String>> tableCfs) { 1927 if (tableCfs == null) { 1928 return failedFuture(new ReplicationException("tableCfs is null")); 1929 } 1930 1931 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1932 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1933 if (!completeExceptionally(future, error)) { 1934 ReplicationPeerConfig newPeerConfig = 1935 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1936 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1937 if (!completeExceptionally(future, error)) { 1938 future.complete(result); 1939 } 1940 }); 1941 } 1942 }); 1943 return future; 1944 } 1945 1946 @Override 1947 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1948 Map<TableName, List<String>> tableCfs) { 1949 if (tableCfs == null) { 1950 return failedFuture(new ReplicationException("tableCfs is null")); 1951 } 1952 1953 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1954 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1955 if (!completeExceptionally(future, error)) { 1956 ReplicationPeerConfig newPeerConfig = null; 1957 try { 1958 newPeerConfig = ReplicationPeerConfigUtil 1959 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1960 } catch (ReplicationException e) { 1961 future.completeExceptionally(e); 1962 return; 1963 } 1964 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1965 if (!completeExceptionally(future, error)) { 1966 future.complete(result); 1967 } 1968 }); 1969 } 1970 }); 1971 return future; 1972 } 1973 1974 @Override 1975 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1976 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1977 } 1978 1979 @Override 1980 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1981 Preconditions.checkNotNull(pattern, 1982 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1983 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1984 } 1985 1986 private CompletableFuture<List<ReplicationPeerDescription>> 1987 listReplicationPeers(ListReplicationPeersRequest request) { 1988 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1989 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1990 List<ReplicationPeerDescription>> call(controller, stub, request, 1991 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1992 (resp) -> resp.getPeerDescList().stream() 1993 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1994 .collect(Collectors.toList()))) 1995 .call(); 1996 } 1997 1998 @Override 1999 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 2000 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 2001 addListener(listTableDescriptors(), (tables, error) -> { 2002 if (!completeExceptionally(future, error)) { 2003 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 2004 tables.forEach(table -> { 2005 Map<String, Integer> cfs = new HashMap<>(); 2006 Stream.of(table.getColumnFamilies()) 2007 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 2008 .forEach(column -> { 2009 cfs.put(column.getNameAsString(), column.getScope()); 2010 }); 2011 if (!cfs.isEmpty()) { 2012 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 2013 } 2014 }); 2015 future.complete(replicatedTableCFs); 2016 } 2017 }); 2018 return future; 2019 } 2020 2021 @Override 2022 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 2023 SnapshotProtos.SnapshotDescription snapshot = 2024 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2025 try { 2026 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2027 } catch (IllegalArgumentException e) { 2028 return failedFuture(e); 2029 } 2030 CompletableFuture<Void> future = new CompletableFuture<>(); 2031 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2032 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2033 addListener(this.<SnapshotResponse> newMasterCaller() 2034 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2035 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2036 .call(), (resp, err) -> { 2037 if (err != null) { 2038 future.completeExceptionally(err); 2039 return; 2040 } 2041 waitSnapshotFinish(snapshotDesc, future, resp); 2042 }); 2043 return future; 2044 } 2045 2046 // This is for keeping compatibility with old implementation. 2047 // If there is a procId field in the response, then the snapshot will be operated with a 2048 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2049 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2050 SnapshotResponse resp) { 2051 if (resp.hasProcId()) { 2052 getProcedureResult(resp.getProcId(), future, 0); 2053 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2054 } else { 2055 long expectedTimeout = resp.getExpectedTimeout(); 2056 TimerTask pollingTask = new TimerTask() { 2057 int tries = 0; 2058 long startTime = EnvironmentEdgeManager.currentTime(); 2059 long endTime = startTime + expectedTimeout; 2060 long maxPauseTime = expectedTimeout / maxAttempts; 2061 2062 @Override 2063 public void run(Timeout timeout) throws Exception { 2064 if (EnvironmentEdgeManager.currentTime() < endTime) { 2065 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2066 if (err2 != null) { 2067 future.completeExceptionally(err2); 2068 } else if (done) { 2069 future.complete(null); 2070 } else { 2071 // retry again after pauseTime. 2072 long pauseTime = 2073 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2074 pauseTime = Math.min(pauseTime, maxPauseTime); 2075 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2076 } 2077 }); 2078 } else { 2079 future 2080 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2081 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2082 } 2083 } 2084 }; 2085 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2086 } 2087 } 2088 2089 @Override 2090 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2091 return this.<Boolean> newMasterCaller() 2092 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2093 Boolean> call(controller, stub, 2094 IsSnapshotDoneRequest.newBuilder() 2095 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2096 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2097 .call(); 2098 } 2099 2100 @Override 2101 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2102 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2103 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2104 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2105 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2106 } 2107 2108 @Override 2109 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2110 boolean restoreAcl) { 2111 CompletableFuture<Void> future = new CompletableFuture<>(); 2112 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2113 if (err != null) { 2114 future.completeExceptionally(err); 2115 return; 2116 } 2117 TableName tableName = null; 2118 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2119 for (SnapshotDescription snap : snapshotDescriptions) { 2120 if (snap.getName().equals(snapshotName)) { 2121 tableName = snap.getTableName(); 2122 break; 2123 } 2124 } 2125 } 2126 if (tableName == null) { 2127 future.completeExceptionally(new RestoreSnapshotException( 2128 "Unable to find the table name for snapshot=" + snapshotName)); 2129 return; 2130 } 2131 final TableName finalTableName = tableName; 2132 addListener(tableExists(finalTableName), (exists, err2) -> { 2133 if (err2 != null) { 2134 future.completeExceptionally(err2); 2135 } else if (!exists) { 2136 // if table does not exist, then just clone snapshot into new table. 2137 completeConditionalOnFuture(future, 2138 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2139 } else { 2140 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2141 if (err4 != null) { 2142 future.completeExceptionally(err4); 2143 } else if (!disabled) { 2144 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2145 } else { 2146 completeConditionalOnFuture(future, 2147 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2148 } 2149 }); 2150 } 2151 }); 2152 }); 2153 return future; 2154 } 2155 2156 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2157 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2158 if (takeFailSafeSnapshot) { 2159 CompletableFuture<Void> future = new CompletableFuture<>(); 2160 // Step.1 Take a snapshot of the current state 2161 String failSafeSnapshotSnapshotNameFormat = 2162 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2163 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2164 final String failSafeSnapshotSnapshotName = 2165 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2166 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2167 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2168 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2169 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2170 if (err != null) { 2171 future.completeExceptionally(err); 2172 } else { 2173 // Step.2 Restore snapshot 2174 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2175 (void2, err2) -> { 2176 if (err2 != null) { 2177 // Step.3.a Something went wrong during the restore and try to rollback. 2178 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2179 restoreAcl, null), (void3, err3) -> { 2180 if (err3 != null) { 2181 future.completeExceptionally(err3); 2182 } else { 2183 String msg = 2184 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" 2185 + failSafeSnapshotSnapshotName + " succeeded."; 2186 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2187 } 2188 }); 2189 } else { 2190 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2191 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2192 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2193 if (err3 != null) { 2194 LOG.error( 2195 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2196 err3); 2197 } 2198 future.complete(ret3); 2199 }); 2200 } 2201 }); 2202 } 2203 }); 2204 return future; 2205 } else { 2206 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2207 } 2208 } 2209 2210 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2211 CompletableFuture<T> parentFuture) { 2212 addListener(parentFuture, (res, err) -> { 2213 if (err != null) { 2214 dependentFuture.completeExceptionally(err); 2215 } else { 2216 dependentFuture.complete(res); 2217 } 2218 }); 2219 } 2220 2221 @Override 2222 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2223 boolean restoreAcl, String customSFT) { 2224 CompletableFuture<Void> future = new CompletableFuture<>(); 2225 addListener(tableExists(tableName), (exists, err) -> { 2226 if (err != null) { 2227 future.completeExceptionally(err); 2228 } else if (exists) { 2229 future.completeExceptionally(new TableExistsException(tableName)); 2230 } else { 2231 completeConditionalOnFuture(future, 2232 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2233 } 2234 }); 2235 return future; 2236 } 2237 2238 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2239 boolean restoreAcl, String customSFT) { 2240 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2241 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2242 try { 2243 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2244 } catch (IllegalArgumentException e) { 2245 return failedFuture(e); 2246 } 2247 RestoreSnapshotRequest.Builder builder = 2248 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2249 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2250 if (customSFT != null) { 2251 builder.setCustomSFT(customSFT); 2252 } 2253 return waitProcedureResult(this.<Long> newMasterCaller() 2254 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2255 Long> call(controller, stub, builder.build(), 2256 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2257 .call()); 2258 } 2259 2260 @Override 2261 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2262 return getCompletedSnapshots(null); 2263 } 2264 2265 @Override 2266 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2267 Preconditions.checkNotNull(pattern, 2268 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2269 return getCompletedSnapshots(pattern); 2270 } 2271 2272 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2273 return this.<List<SnapshotDescription>> newMasterCaller() 2274 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2275 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2276 GetCompletedSnapshotsRequest.newBuilder().build(), 2277 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2278 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2279 .call(); 2280 } 2281 2282 @Override 2283 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2284 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2285 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2286 return getCompletedSnapshots(tableNamePattern, null); 2287 } 2288 2289 @Override 2290 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2291 Pattern snapshotNamePattern) { 2292 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2293 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2294 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2295 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2296 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2297 } 2298 2299 private CompletableFuture<List<SnapshotDescription>> 2300 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2301 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2302 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2303 if (err != null) { 2304 future.completeExceptionally(err); 2305 return; 2306 } 2307 if (tableNames == null || tableNames.size() <= 0) { 2308 future.complete(Collections.emptyList()); 2309 return; 2310 } 2311 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2312 if (err2 != null) { 2313 future.completeExceptionally(err2); 2314 return; 2315 } 2316 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2317 future.complete(Collections.emptyList()); 2318 return; 2319 } 2320 future.complete(snapshotDescList.stream() 2321 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2322 .collect(Collectors.toList())); 2323 }); 2324 }); 2325 return future; 2326 } 2327 2328 @Override 2329 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2330 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2331 } 2332 2333 @Override 2334 public CompletableFuture<Void> deleteSnapshots() { 2335 return internalDeleteSnapshots(null, null); 2336 } 2337 2338 @Override 2339 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2340 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2341 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2342 return internalDeleteSnapshots(null, snapshotNamePattern); 2343 } 2344 2345 @Override 2346 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2347 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2348 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2349 return internalDeleteSnapshots(tableNamePattern, null); 2350 } 2351 2352 @Override 2353 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2354 Pattern snapshotNamePattern) { 2355 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2356 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2357 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2358 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2359 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2360 } 2361 2362 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2363 Pattern snapshotNamePattern) { 2364 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2365 if (tableNamePattern == null) { 2366 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2367 } else { 2368 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2369 } 2370 CompletableFuture<Void> future = new CompletableFuture<>(); 2371 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2372 if (err != null) { 2373 future.completeExceptionally(err); 2374 return; 2375 } 2376 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2377 future.complete(null); 2378 return; 2379 } 2380 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2381 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2382 if (e != null) { 2383 future.completeExceptionally(e); 2384 } else { 2385 future.complete(v); 2386 } 2387 }); 2388 }); 2389 return future; 2390 } 2391 2392 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2393 return this.<Void> newMasterCaller() 2394 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2395 controller, stub, 2396 DeleteSnapshotRequest.newBuilder() 2397 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2398 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2399 .call(); 2400 } 2401 2402 @Override 2403 public CompletableFuture<Void> execProcedure(String signature, String instance, 2404 Map<String, String> props) { 2405 CompletableFuture<Void> future = new CompletableFuture<>(); 2406 ProcedureDescription procDesc = 2407 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2408 addListener(this.<Long> newMasterCaller() 2409 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2410 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2411 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2412 .call(), (expectedTimeout, err) -> { 2413 if (err != null) { 2414 future.completeExceptionally(err); 2415 return; 2416 } 2417 TimerTask pollingTask = new TimerTask() { 2418 int tries = 0; 2419 long startTime = EnvironmentEdgeManager.currentTime(); 2420 long endTime = startTime + expectedTimeout; 2421 long maxPauseTime = expectedTimeout / maxAttempts; 2422 2423 @Override 2424 public void run(Timeout timeout) throws Exception { 2425 if (EnvironmentEdgeManager.currentTime() < endTime) { 2426 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2427 if (err2 != null) { 2428 future.completeExceptionally(err2); 2429 return; 2430 } 2431 if (done) { 2432 future.complete(null); 2433 } else { 2434 // retry again after pauseTime. 2435 long pauseTime = 2436 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2437 pauseTime = Math.min(pauseTime, maxPauseTime); 2438 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2439 TimeUnit.MICROSECONDS); 2440 } 2441 }); 2442 } else { 2443 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2444 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2445 } 2446 } 2447 }; 2448 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2449 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2450 }); 2451 return future; 2452 } 2453 2454 @Override 2455 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2456 Map<String, String> props) { 2457 ProcedureDescription proDesc = 2458 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2459 return this.<byte[]> newMasterCaller() 2460 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2461 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2462 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2463 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2464 .call(); 2465 } 2466 2467 @Override 2468 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2469 Map<String, String> props) { 2470 ProcedureDescription proDesc = 2471 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2472 return this.<Boolean> newMasterCaller() 2473 .action( 2474 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2475 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2476 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2477 .call(); 2478 } 2479 2480 @Override 2481 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2482 return this.<Boolean> newMasterCaller().action( 2483 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2484 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2485 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2486 .call(); 2487 } 2488 2489 @Override 2490 public CompletableFuture<String> getProcedures() { 2491 return this.<String> newMasterCaller() 2492 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2493 controller, stub, GetProceduresRequest.newBuilder().build(), 2494 (s, c, req, done) -> s.getProcedures(c, req, done), 2495 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2496 .call(); 2497 } 2498 2499 @Override 2500 public CompletableFuture<String> getLocks() { 2501 return this.<String> newMasterCaller() 2502 .action( 2503 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2504 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2505 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2506 .call(); 2507 } 2508 2509 @Override 2510 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2511 boolean offload) { 2512 return this.<Void> newMasterCaller() 2513 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2514 DecommissionRegionServersResponse, Void> call(controller, stub, 2515 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2516 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2517 .call(); 2518 } 2519 2520 @Override 2521 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2522 return this.<List<ServerName>> newMasterCaller() 2523 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2524 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2525 ListDecommissionedRegionServersRequest.newBuilder().build(), 2526 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2527 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2528 .collect(Collectors.toList()))) 2529 .call(); 2530 } 2531 2532 @Override 2533 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2534 List<byte[]> encodedRegionNames) { 2535 return this.<Void> newMasterCaller() 2536 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2537 RecommissionRegionServerResponse, Void> call(controller, stub, 2538 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2539 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2540 .call(); 2541 } 2542 2543 /** 2544 * Get the region location for the passed region name. The region name may be a full region name 2545 * or encoded region name. If the region does not found, then it'll throw an 2546 * UnknownRegionException wrapped by a {@link CompletableFuture} 2547 * @param regionNameOrEncodedRegionName region name or encoded region name 2548 * @return region location, wrapped by a {@link CompletableFuture} 2549 */ 2550 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2551 if (regionNameOrEncodedRegionName == null) { 2552 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2553 } 2554 2555 CompletableFuture<Optional<HRegionLocation>> future; 2556 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2557 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2558 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2559 // old format encodedName, should be meta region 2560 future = connection.registry.getMetaRegionLocations() 2561 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2562 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2563 } else { 2564 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2565 regionNameOrEncodedRegionName); 2566 } 2567 } else { 2568 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2569 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2570 RegionInfo regionInfo; 2571 try { 2572 regionInfo = 2573 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2574 } catch (IOException ioe) { 2575 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2576 } 2577 2578 if (regionInfo.isMetaRegion()) { 2579 future = connection.registry.getMetaRegionLocations() 2580 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2581 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2582 .findFirst()); 2583 } else { 2584 future = 2585 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2586 } 2587 } 2588 2589 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2590 addListener(future, (location, err) -> { 2591 if (err != null) { 2592 returnedFuture.completeExceptionally(err); 2593 return; 2594 } 2595 if (!location.isPresent() || location.get().getRegion() == null) { 2596 returnedFuture.completeExceptionally( 2597 new UnknownRegionException("Invalid region name or encoded region name: " 2598 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2599 } else { 2600 returnedFuture.complete(location.get()); 2601 } 2602 }); 2603 return returnedFuture; 2604 } 2605 2606 /** 2607 * Get the region info for the passed region name. The region name may be a full region name or 2608 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2609 * wrapped by a {@link CompletableFuture} 2610 * @return region info, wrapped by a {@link CompletableFuture} 2611 */ 2612 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2613 if (regionNameOrEncodedRegionName == null) { 2614 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2615 } 2616 2617 if ( 2618 Bytes.equals(regionNameOrEncodedRegionName, 2619 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2620 || Bytes.equals(regionNameOrEncodedRegionName, 2621 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2622 ) { 2623 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2624 } 2625 2626 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2627 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2628 if (err != null) { 2629 future.completeExceptionally(err); 2630 } else { 2631 future.complete(location.getRegion()); 2632 } 2633 }); 2634 return future; 2635 } 2636 2637 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2638 if (numRegions < 3) { 2639 throw new IllegalArgumentException("Must create at least three regions"); 2640 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2641 throw new IllegalArgumentException("Start key must be smaller than end key"); 2642 } 2643 if (numRegions == 3) { 2644 return new byte[][] { startKey, endKey }; 2645 } 2646 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2647 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2648 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2649 } 2650 return splitKeys; 2651 } 2652 2653 private void verifySplitKeys(byte[][] splitKeys) { 2654 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2655 // Verify there are no duplicate split keys 2656 byte[] lastKey = null; 2657 for (byte[] splitKey : splitKeys) { 2658 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2659 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2660 } 2661 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2662 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2663 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2664 } 2665 lastKey = splitKey; 2666 } 2667 } 2668 2669 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2670 2671 abstract void onFinished(); 2672 2673 abstract void onError(Throwable error); 2674 2675 @Override 2676 public void accept(Void v, Throwable error) { 2677 if (error != null) { 2678 onError(error); 2679 return; 2680 } 2681 onFinished(); 2682 } 2683 } 2684 2685 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2686 protected final TableName tableName; 2687 2688 TableProcedureBiConsumer(TableName tableName) { 2689 this.tableName = tableName; 2690 } 2691 2692 abstract String getOperationType(); 2693 2694 String getDescription() { 2695 return "Operation: " + getOperationType() + ", " + "Table Name: " 2696 + tableName.getNameWithNamespaceInclAsString(); 2697 } 2698 2699 @Override 2700 void onFinished() { 2701 LOG.info(getDescription() + " completed"); 2702 } 2703 2704 @Override 2705 void onError(Throwable error) { 2706 LOG.info(getDescription() + " failed with " + error.getMessage()); 2707 } 2708 } 2709 2710 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2711 protected final String namespaceName; 2712 2713 NamespaceProcedureBiConsumer(String namespaceName) { 2714 this.namespaceName = namespaceName; 2715 } 2716 2717 abstract String getOperationType(); 2718 2719 String getDescription() { 2720 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2721 } 2722 2723 @Override 2724 void onFinished() { 2725 LOG.info(getDescription() + " completed"); 2726 } 2727 2728 @Override 2729 void onError(Throwable error) { 2730 LOG.info(getDescription() + " failed with " + error.getMessage()); 2731 } 2732 } 2733 2734 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2735 2736 CreateTableProcedureBiConsumer(TableName tableName) { 2737 super(tableName); 2738 } 2739 2740 @Override 2741 String getOperationType() { 2742 return "CREATE"; 2743 } 2744 } 2745 2746 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2747 2748 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2749 super(tableName); 2750 } 2751 2752 @Override 2753 String getOperationType() { 2754 return "MODIFY"; 2755 } 2756 } 2757 2758 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2759 extends TableProcedureBiConsumer { 2760 2761 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2762 super(tableName); 2763 } 2764 2765 @Override 2766 String getOperationType() { 2767 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2768 } 2769 } 2770 2771 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2772 2773 DeleteTableProcedureBiConsumer(TableName tableName) { 2774 super(tableName); 2775 } 2776 2777 @Override 2778 String getOperationType() { 2779 return "DELETE"; 2780 } 2781 2782 @Override 2783 void onFinished() { 2784 connection.getLocator().clearCache(this.tableName); 2785 super.onFinished(); 2786 } 2787 } 2788 2789 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2790 2791 TruncateTableProcedureBiConsumer(TableName tableName) { 2792 super(tableName); 2793 } 2794 2795 @Override 2796 String getOperationType() { 2797 return "TRUNCATE"; 2798 } 2799 } 2800 2801 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2802 2803 EnableTableProcedureBiConsumer(TableName tableName) { 2804 super(tableName); 2805 } 2806 2807 @Override 2808 String getOperationType() { 2809 return "ENABLE"; 2810 } 2811 } 2812 2813 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2814 2815 DisableTableProcedureBiConsumer(TableName tableName) { 2816 super(tableName); 2817 } 2818 2819 @Override 2820 String getOperationType() { 2821 return "DISABLE"; 2822 } 2823 } 2824 2825 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2826 2827 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2828 super(tableName); 2829 } 2830 2831 @Override 2832 String getOperationType() { 2833 return "ADD_COLUMN_FAMILY"; 2834 } 2835 } 2836 2837 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2838 2839 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2840 super(tableName); 2841 } 2842 2843 @Override 2844 String getOperationType() { 2845 return "DELETE_COLUMN_FAMILY"; 2846 } 2847 } 2848 2849 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2850 2851 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2852 super(tableName); 2853 } 2854 2855 @Override 2856 String getOperationType() { 2857 return "MODIFY_COLUMN_FAMILY"; 2858 } 2859 } 2860 2861 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2862 extends TableProcedureBiConsumer { 2863 2864 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2865 super(tableName); 2866 } 2867 2868 @Override 2869 String getOperationType() { 2870 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2871 } 2872 } 2873 2874 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2875 2876 FlushTableProcedureBiConsumer(TableName tableName) { 2877 super(tableName); 2878 } 2879 2880 @Override 2881 String getOperationType() { 2882 return "FLUSH"; 2883 } 2884 } 2885 2886 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2887 2888 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2889 super(namespaceName); 2890 } 2891 2892 @Override 2893 String getOperationType() { 2894 return "CREATE_NAMESPACE"; 2895 } 2896 } 2897 2898 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2899 2900 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2901 super(namespaceName); 2902 } 2903 2904 @Override 2905 String getOperationType() { 2906 return "DELETE_NAMESPACE"; 2907 } 2908 } 2909 2910 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2911 2912 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2913 super(namespaceName); 2914 } 2915 2916 @Override 2917 String getOperationType() { 2918 return "MODIFY_NAMESPACE"; 2919 } 2920 } 2921 2922 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2923 2924 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2925 super(tableName); 2926 } 2927 2928 @Override 2929 String getOperationType() { 2930 return "MERGE_REGIONS"; 2931 } 2932 } 2933 2934 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2935 2936 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2937 super(tableName); 2938 } 2939 2940 @Override 2941 String getOperationType() { 2942 return "SPLIT_REGION"; 2943 } 2944 } 2945 2946 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2947 2948 TruncateRegionProcedureBiConsumer(TableName tableName) { 2949 super(tableName); 2950 } 2951 2952 @Override 2953 String getOperationType() { 2954 return "TRUNCATE_REGION"; 2955 } 2956 } 2957 2958 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2959 SnapshotProcedureBiConsumer(TableName tableName) { 2960 super(tableName); 2961 } 2962 2963 @Override 2964 String getOperationType() { 2965 return "SNAPSHOT"; 2966 } 2967 } 2968 2969 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2970 private final String peerId; 2971 private final Supplier<String> getOperation; 2972 2973 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2974 this.peerId = peerId; 2975 this.getOperation = getOperation; 2976 } 2977 2978 String getDescription() { 2979 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2980 } 2981 2982 @Override 2983 void onFinished() { 2984 LOG.info(getDescription() + " completed"); 2985 } 2986 2987 @Override 2988 void onError(Throwable error) { 2989 LOG.info(getDescription() + " failed with " + error.getMessage()); 2990 } 2991 } 2992 2993 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2994 CompletableFuture<Void> future = new CompletableFuture<>(); 2995 addListener(procFuture, (procId, error) -> { 2996 if (error != null) { 2997 future.completeExceptionally(error); 2998 return; 2999 } 3000 getProcedureResult(procId, future, 0); 3001 }); 3002 return future; 3003 } 3004 3005 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 3006 addListener( 3007 this.<GetProcedureResultResponse> newMasterCaller() 3008 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 3009 GetProcedureResultResponse> call(controller, stub, 3010 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 3011 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 3012 .call(), 3013 (response, error) -> { 3014 if (error != null) { 3015 LOG.warn("failed to get the procedure result procId={}", procId, 3016 ConnectionUtils.translateException(error)); 3017 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 3018 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3019 return; 3020 } 3021 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 3022 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 3023 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3024 return; 3025 } 3026 if (response.hasException()) { 3027 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 3028 future.completeExceptionally(ioe); 3029 } else { 3030 future.complete(null); 3031 } 3032 }); 3033 } 3034 3035 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3036 CompletableFuture<T> future = new CompletableFuture<>(); 3037 future.completeExceptionally(error); 3038 return future; 3039 } 3040 3041 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3042 if (error != null) { 3043 future.completeExceptionally(error); 3044 return true; 3045 } 3046 return false; 3047 } 3048 3049 @Override 3050 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3051 return getClusterMetrics(EnumSet.allOf(Option.class)); 3052 } 3053 3054 @Override 3055 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3056 return this.<ClusterMetrics> newMasterCaller() 3057 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3058 ClusterMetrics> call(controller, stub, 3059 RequestConverter.buildGetClusterStatusRequest(options), 3060 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3061 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3062 .call(); 3063 } 3064 3065 @Override 3066 public CompletableFuture<Void> shutdown() { 3067 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3068 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3069 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3070 resp -> null)) 3071 .call(); 3072 } 3073 3074 @Override 3075 public CompletableFuture<Void> stopMaster() { 3076 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3077 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3078 controller, stub, StopMasterRequest.newBuilder().build(), 3079 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3080 .call(); 3081 } 3082 3083 @Override 3084 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3085 StopServerRequest request = RequestConverter 3086 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3087 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3088 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3089 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3090 resp -> null)) 3091 .serverName(serverName).call(); 3092 } 3093 3094 @Override 3095 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3096 return this.<Void> newAdminCaller() 3097 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3098 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3099 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3100 .serverName(serverName).call(); 3101 } 3102 3103 @Override 3104 public CompletableFuture<Void> updateConfiguration() { 3105 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3106 addListener( 3107 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3108 (status, err) -> { 3109 if (err != null) { 3110 future.completeExceptionally(err); 3111 } else { 3112 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3113 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3114 futures.add(updateConfiguration(status.getMasterName())); 3115 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3116 addListener( 3117 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3118 (result, err2) -> { 3119 if (err2 != null) { 3120 future.completeExceptionally(err2); 3121 } else { 3122 future.complete(result); 3123 } 3124 }); 3125 } 3126 }); 3127 return future; 3128 } 3129 3130 @Override 3131 public CompletableFuture<Void> updateConfiguration(String groupName) { 3132 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3133 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3134 if (err != null) { 3135 future.completeExceptionally(err); 3136 } else if (rsGroupInfo == null) { 3137 future.completeExceptionally( 3138 new IllegalArgumentException("Group does not exist: " + groupName)); 3139 } else { 3140 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3141 if (err2 != null) { 3142 future.completeExceptionally(err2); 3143 } else { 3144 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3145 List<ServerName> groupServers = status.getServersName().stream() 3146 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3147 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3148 addListener( 3149 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3150 (result, err3) -> { 3151 if (err3 != null) { 3152 future.completeExceptionally(err3); 3153 } else { 3154 future.complete(result); 3155 } 3156 }); 3157 } 3158 }); 3159 } 3160 }); 3161 return future; 3162 } 3163 3164 @Override 3165 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3166 return this.<Void> newAdminCaller() 3167 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3168 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3169 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3170 .serverName(serverName).call(); 3171 } 3172 3173 @Override 3174 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3175 return this.<Void> newAdminCaller() 3176 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3177 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3178 RequestConverter.buildClearCompactionQueuesRequest(queues), 3179 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3180 .serverName(serverName).call(); 3181 } 3182 3183 @Override 3184 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3185 return this.<List<SecurityCapability>> newMasterCaller() 3186 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3187 List<SecurityCapability>> call(controller, stub, 3188 SecurityCapabilitiesRequest.newBuilder().build(), 3189 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3190 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3191 .call(); 3192 } 3193 3194 @Override 3195 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3196 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3197 } 3198 3199 @Override 3200 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3201 TableName tableName) { 3202 Preconditions.checkNotNull(tableName, 3203 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3204 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3205 } 3206 3207 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3208 ServerName serverName) { 3209 return this.<List<RegionMetrics>> newAdminCaller() 3210 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3211 List<RegionMetrics>> adminCall(controller, stub, request, 3212 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3213 RegionMetricsBuilder::toRegionMetrics)) 3214 .serverName(serverName).call(); 3215 } 3216 3217 @Override 3218 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3219 return this.<Boolean> newMasterCaller() 3220 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3221 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3222 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3223 resp -> resp.getInMaintenanceMode())) 3224 .call(); 3225 } 3226 3227 @Override 3228 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3229 CompactType compactType) { 3230 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3231 3232 switch (compactType) { 3233 case MOB: 3234 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3235 if (err != null) { 3236 future.completeExceptionally(err); 3237 return; 3238 } 3239 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3240 3241 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3242 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3243 GetRegionInfoResponse> adminCall(controller, stub, 3244 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3245 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3246 .call(), (resp2, err2) -> { 3247 if (err2 != null) { 3248 future.completeExceptionally(err2); 3249 } else { 3250 if (resp2.hasCompactionState()) { 3251 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3252 } else { 3253 future.complete(CompactionState.NONE); 3254 } 3255 } 3256 }); 3257 }); 3258 break; 3259 case NORMAL: 3260 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3261 if (err != null) { 3262 future.completeExceptionally(err); 3263 return; 3264 } 3265 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3266 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3267 locations.stream().filter(loc -> loc.getServerName() != null) 3268 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3269 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3270 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3271 // If any region compaction state is MAJOR_AND_MINOR 3272 // the table compaction state is MAJOR_AND_MINOR, too. 3273 if (err2 != null) { 3274 future.completeExceptionally(unwrapCompletionException(err2)); 3275 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3276 future.complete(regionState); 3277 } else { 3278 regionStates.add(regionState); 3279 } 3280 })); 3281 }); 3282 addListener( 3283 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3284 (ret, err3) -> { 3285 // If future not completed, check all regions's compaction state 3286 if (!future.isCompletedExceptionally() && !future.isDone()) { 3287 CompactionState state = CompactionState.NONE; 3288 for (CompactionState regionState : regionStates) { 3289 switch (regionState) { 3290 case MAJOR: 3291 if (state == CompactionState.MINOR) { 3292 future.complete(CompactionState.MAJOR_AND_MINOR); 3293 } else { 3294 state = CompactionState.MAJOR; 3295 } 3296 break; 3297 case MINOR: 3298 if (state == CompactionState.MAJOR) { 3299 future.complete(CompactionState.MAJOR_AND_MINOR); 3300 } else { 3301 state = CompactionState.MINOR; 3302 } 3303 break; 3304 case NONE: 3305 default: 3306 } 3307 } 3308 if (!future.isDone()) { 3309 future.complete(state); 3310 } 3311 } 3312 }); 3313 }); 3314 break; 3315 default: 3316 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3317 } 3318 3319 return future; 3320 } 3321 3322 @Override 3323 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3324 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3325 addListener(getRegionLocation(regionName), (location, err) -> { 3326 if (err != null) { 3327 future.completeExceptionally(err); 3328 return; 3329 } 3330 ServerName serverName = location.getServerName(); 3331 if (serverName == null) { 3332 future 3333 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3334 return; 3335 } 3336 addListener( 3337 this.<GetRegionInfoResponse> newAdminCaller() 3338 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3339 GetRegionInfoResponse> adminCall(controller, stub, 3340 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3341 true), 3342 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3343 .serverName(serverName).call(), 3344 (resp2, err2) -> { 3345 if (err2 != null) { 3346 future.completeExceptionally(err2); 3347 } else { 3348 if (resp2.hasCompactionState()) { 3349 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3350 } else { 3351 future.complete(CompactionState.NONE); 3352 } 3353 } 3354 }); 3355 }); 3356 return future; 3357 } 3358 3359 @Override 3360 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3361 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3362 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3363 return this.<Optional<Long>> newMasterCaller() 3364 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3365 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3366 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3367 ProtobufUtil::toOptionalTimestamp)) 3368 .call(); 3369 } 3370 3371 @Override 3372 public CompletableFuture<Optional<Long>> 3373 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3374 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3375 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3376 addListener(getRegionInfo(regionName), (region, err) -> { 3377 if (err != null) { 3378 future.completeExceptionally(err); 3379 return; 3380 } 3381 MajorCompactionTimestampForRegionRequest.Builder builder = 3382 MajorCompactionTimestampForRegionRequest.newBuilder(); 3383 builder.setRegion( 3384 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3385 addListener(this.<Optional<Long>> newMasterCaller() 3386 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3387 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3388 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3389 ProtobufUtil::toOptionalTimestamp)) 3390 .call(), (timestamp, err2) -> { 3391 if (err2 != null) { 3392 future.completeExceptionally(err2); 3393 } else { 3394 future.complete(timestamp); 3395 } 3396 }); 3397 }); 3398 return future; 3399 } 3400 3401 @Override 3402 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3403 List<String> serverNamesList) { 3404 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3405 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3406 if (err != null) { 3407 future.completeExceptionally(err); 3408 return; 3409 } 3410 // Accessed by multiple threads. 3411 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3412 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3413 serverNames.stream().forEach(serverName -> { 3414 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3415 if (err2 != null) { 3416 future.completeExceptionally(unwrapCompletionException(err2)); 3417 } else { 3418 serverStates.put(serverName, serverState); 3419 } 3420 })); 3421 }); 3422 addListener( 3423 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3424 (ret, err3) -> { 3425 if (!future.isCompletedExceptionally()) { 3426 if (err3 != null) { 3427 future.completeExceptionally(err3); 3428 } else { 3429 future.complete(serverStates); 3430 } 3431 } 3432 }); 3433 }); 3434 return future; 3435 } 3436 3437 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3438 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3439 if (serverNamesList.isEmpty()) { 3440 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3441 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3442 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3443 if (err != null) { 3444 future.completeExceptionally(err); 3445 } else { 3446 future.complete(clusterMetrics.getServersName()); 3447 } 3448 }); 3449 return future; 3450 } else { 3451 List<ServerName> serverList = new ArrayList<>(); 3452 for (String regionServerName : serverNamesList) { 3453 ServerName serverName = null; 3454 try { 3455 serverName = ServerName.valueOf(regionServerName); 3456 } catch (Exception e) { 3457 future.completeExceptionally( 3458 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3459 } 3460 if (serverName == null) { 3461 future.completeExceptionally( 3462 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3463 } else { 3464 serverList.add(serverName); 3465 } 3466 } 3467 future.complete(serverList); 3468 } 3469 return future; 3470 } 3471 3472 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3473 return this.<Boolean> newAdminCaller().serverName(serverName) 3474 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3475 Boolean> adminCall(controller, stub, 3476 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3477 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3478 .call(); 3479 } 3480 3481 @Override 3482 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3483 return this.<Boolean> newMasterCaller() 3484 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3485 Boolean> call(controller, stub, 3486 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3487 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3488 (resp) -> resp.getPrevBalanceValue())) 3489 .call(); 3490 } 3491 3492 @Override 3493 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3494 return this.<BalanceResponse> newMasterCaller() 3495 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3496 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3497 (s, c, req, done) -> s.balance(c, req, done), 3498 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3499 .call(); 3500 } 3501 3502 @Override 3503 public CompletableFuture<Boolean> isBalancerEnabled() { 3504 return this.<Boolean> newMasterCaller() 3505 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3506 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3507 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3508 .call(); 3509 } 3510 3511 @Override 3512 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3513 return this.<Boolean> newMasterCaller() 3514 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3515 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3516 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3517 (resp) -> resp.getPrevNormalizerValue())) 3518 .call(); 3519 } 3520 3521 @Override 3522 public CompletableFuture<Boolean> isNormalizerEnabled() { 3523 return this.<Boolean> newMasterCaller() 3524 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3525 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3526 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3527 .call(); 3528 } 3529 3530 @Override 3531 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3532 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3533 } 3534 3535 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3536 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3537 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3538 } 3539 3540 @Override 3541 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3542 return this.<Boolean> newMasterCaller() 3543 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3544 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3545 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3546 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3547 (resp) -> resp.getPrevValue())) 3548 .call(); 3549 } 3550 3551 @Override 3552 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3553 return this.<Boolean> newMasterCaller() 3554 .action( 3555 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3556 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3557 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3558 .call(); 3559 } 3560 3561 @Override 3562 public CompletableFuture<Boolean> runCleanerChore() { 3563 return this.<Boolean> newMasterCaller() 3564 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3565 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3566 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3567 (resp) -> resp.getCleanerChoreRan())) 3568 .call(); 3569 } 3570 3571 @Override 3572 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3573 return this.<Boolean> newMasterCaller() 3574 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3575 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3576 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3577 .call(); 3578 } 3579 3580 @Override 3581 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3582 return this.<Boolean> newMasterCaller() 3583 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3584 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3585 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3586 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3587 .call(); 3588 } 3589 3590 @Override 3591 public CompletableFuture<Integer> runCatalogJanitor() { 3592 return this.<Integer> newMasterCaller() 3593 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3594 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3595 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3596 .call(); 3597 } 3598 3599 @Override 3600 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3601 ServiceCaller<S, R> callable) { 3602 MasterCoprocessorRpcChannelImpl channel = 3603 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3604 S stub = stubMaker.apply(channel); 3605 CompletableFuture<R> future = new CompletableFuture<>(); 3606 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3607 callable.call(stub, controller, resp -> { 3608 if (controller.failed()) { 3609 future.completeExceptionally(controller.getFailed()); 3610 } else { 3611 future.complete(resp); 3612 } 3613 }); 3614 return future; 3615 } 3616 3617 @Override 3618 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3619 ServiceCaller<S, R> callable, ServerName serverName) { 3620 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3621 this.<Message> newServerCaller().serverName(serverName)); 3622 S stub = stubMaker.apply(channel); 3623 CompletableFuture<R> future = new CompletableFuture<>(); 3624 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3625 callable.call(stub, controller, resp -> { 3626 if (controller.failed()) { 3627 future.completeExceptionally(controller.getFailed()); 3628 } else { 3629 future.complete(resp); 3630 } 3631 }); 3632 return future; 3633 } 3634 3635 @Override 3636 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3637 return this.<List<ServerName>> newMasterCaller() 3638 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3639 List<ServerName>> call(controller, stub, 3640 RequestConverter.buildClearDeadServersRequest(servers), 3641 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3642 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3643 .call(); 3644 } 3645 3646 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3647 return this.connection.callerFactory.<T> serverRequest() 3648 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3649 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3650 .pause(pauseNs, TimeUnit.NANOSECONDS) 3651 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3652 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3653 } 3654 3655 @Override 3656 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3657 if (tableName == null) { 3658 return failedFuture(new IllegalArgumentException("Table name is null")); 3659 } 3660 CompletableFuture<Void> future = new CompletableFuture<>(); 3661 addListener(tableExists(tableName), (exist, err) -> { 3662 if (err != null) { 3663 future.completeExceptionally(err); 3664 return; 3665 } 3666 if (!exist) { 3667 future.completeExceptionally(new TableNotFoundException( 3668 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3669 return; 3670 } 3671 addListener(getTableSplits(tableName), (splits, err1) -> { 3672 if (err1 != null) { 3673 future.completeExceptionally(err1); 3674 } else { 3675 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3676 if (err2 != null) { 3677 future.completeExceptionally(err2); 3678 } else { 3679 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3680 if (err3 != null) { 3681 future.completeExceptionally(err3); 3682 } else { 3683 future.complete(result3); 3684 } 3685 }); 3686 } 3687 }); 3688 } 3689 }); 3690 }); 3691 return future; 3692 } 3693 3694 @Override 3695 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3696 if (tableName == null) { 3697 return failedFuture(new IllegalArgumentException("Table name is null")); 3698 } 3699 CompletableFuture<Void> future = new CompletableFuture<>(); 3700 addListener(tableExists(tableName), (exist, err) -> { 3701 if (err != null) { 3702 future.completeExceptionally(err); 3703 return; 3704 } 3705 if (!exist) { 3706 future.completeExceptionally(new TableNotFoundException( 3707 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3708 return; 3709 } 3710 addListener(setTableReplication(tableName, false), (result, err2) -> { 3711 if (err2 != null) { 3712 future.completeExceptionally(err2); 3713 } else { 3714 future.complete(result); 3715 } 3716 }); 3717 }); 3718 return future; 3719 } 3720 3721 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3722 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3723 addListener( 3724 getRegions(tableName).thenApply(regions -> regions.stream() 3725 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3726 (regions, err2) -> { 3727 if (err2 != null) { 3728 future.completeExceptionally(err2); 3729 return; 3730 } 3731 if (regions.size() == 1) { 3732 future.complete(null); 3733 } else { 3734 byte[][] splits = new byte[regions.size() - 1][]; 3735 for (int i = 1; i < regions.size(); i++) { 3736 splits[i - 1] = regions.get(i).getStartKey(); 3737 } 3738 future.complete(splits); 3739 } 3740 }); 3741 return future; 3742 } 3743 3744 /** 3745 * Connect to peer and check the table descriptor on peer: 3746 * <ol> 3747 * <li>Create the same table on peer when not exist.</li> 3748 * <li>Throw an exception if the table already has replication enabled on any of the column 3749 * families.</li> 3750 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3751 * </ol> 3752 * @param tableName name of the table to sync to the peer 3753 * @param splits table split keys 3754 */ 3755 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3756 byte[][] splits) { 3757 CompletableFuture<Void> future = new CompletableFuture<>(); 3758 addListener(listReplicationPeers(), (peers, err) -> { 3759 if (err != null) { 3760 future.completeExceptionally(err); 3761 return; 3762 } 3763 if (peers == null || peers.size() <= 0) { 3764 future.completeExceptionally( 3765 new IllegalArgumentException("Found no peer cluster for replication.")); 3766 return; 3767 } 3768 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3769 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3770 .forEach(peer -> { 3771 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3772 }); 3773 addListener( 3774 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3775 (result, err2) -> { 3776 if (err2 != null) { 3777 future.completeExceptionally(err2); 3778 } else { 3779 future.complete(result); 3780 } 3781 }); 3782 }); 3783 return future; 3784 } 3785 3786 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3787 ReplicationPeerDescription peer) { 3788 Configuration peerConf = null; 3789 try { 3790 peerConf = 3791 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3792 } catch (IOException e) { 3793 return failedFuture(e); 3794 } 3795 CompletableFuture<Void> future = new CompletableFuture<>(); 3796 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3797 if (err != null) { 3798 future.completeExceptionally(err); 3799 return; 3800 } 3801 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3802 if (err1 != null) { 3803 future.completeExceptionally(err1); 3804 return; 3805 } 3806 AsyncAdmin peerAdmin = conn.getAdmin(); 3807 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3808 if (err2 != null) { 3809 future.completeExceptionally(err2); 3810 return; 3811 } 3812 if (!exist) { 3813 CompletableFuture<Void> createTableFuture = null; 3814 if (splits == null) { 3815 createTableFuture = peerAdmin.createTable(tableDesc); 3816 } else { 3817 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3818 } 3819 addListener(createTableFuture, (result, err3) -> { 3820 if (err3 != null) { 3821 future.completeExceptionally(err3); 3822 } else { 3823 future.complete(result); 3824 } 3825 }); 3826 } else { 3827 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3828 (result, err4) -> { 3829 if (err4 != null) { 3830 future.completeExceptionally(err4); 3831 } else { 3832 future.complete(result); 3833 } 3834 }); 3835 } 3836 }); 3837 }); 3838 }); 3839 return future; 3840 } 3841 3842 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3843 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3844 CompletableFuture<Void> future = new CompletableFuture<>(); 3845 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3846 if (err != null) { 3847 future.completeExceptionally(err); 3848 return; 3849 } 3850 if (peerTableDesc == null) { 3851 future.completeExceptionally( 3852 new IllegalArgumentException("Failed to get table descriptor for table " 3853 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3854 return; 3855 } 3856 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3857 future.completeExceptionally(new IllegalArgumentException( 3858 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3859 + ", but the table descriptors are not same when compared with source cluster." 3860 + " Thus can not enable the table's replication switch.")); 3861 return; 3862 } 3863 future.complete(null); 3864 }); 3865 return future; 3866 } 3867 3868 /** 3869 * Set the table's replication switch if the table's replication switch is already not set. 3870 * @param tableName name of the table 3871 * @param enableRep is replication switch enable or disable 3872 */ 3873 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3874 CompletableFuture<Void> future = new CompletableFuture<>(); 3875 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3876 if (err != null) { 3877 future.completeExceptionally(err); 3878 return; 3879 } 3880 if (!tableDesc.matchReplicationScope(enableRep)) { 3881 int scope = 3882 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3883 TableDescriptor newTableDesc = 3884 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3885 addListener(modifyTable(newTableDesc), (result, err2) -> { 3886 if (err2 != null) { 3887 future.completeExceptionally(err2); 3888 } else { 3889 future.complete(result); 3890 } 3891 }); 3892 } else { 3893 future.complete(null); 3894 } 3895 }); 3896 return future; 3897 } 3898 3899 @Override 3900 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 3901 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 3902 request.setPeerId(peerId); 3903 return this.<Boolean> newMasterCaller() 3904 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 3905 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 3906 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 3907 resp -> resp.getIsEnabled())) 3908 .call(); 3909 } 3910 3911 private void waitUntilAllReplicationPeerModificationProceduresDone( 3912 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 3913 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 3914 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 3915 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 3916 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 3917 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 3918 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 3919 resp -> resp.getProcedureList())) 3920 .call(); 3921 addListener(callFuture, (r, e) -> { 3922 if (e != null) { 3923 future.completeExceptionally(e); 3924 } else if (r.isEmpty()) { 3925 // we are done 3926 future.complete(prevOn); 3927 } else { 3928 // retry later to see if the procedures are done 3929 retryTimer.newTimeout( 3930 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 3931 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3932 } 3933 }); 3934 } 3935 3936 @Override 3937 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 3938 boolean drainProcedures) { 3939 ReplicationPeerModificationSwitchRequest request = 3940 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 3941 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 3942 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 3943 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 3944 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 3945 resp -> resp.getPreviousValue())) 3946 .call(); 3947 // if we do not need to wait all previous peer modification procedure done, or we are enabling 3948 // peer modification, just return here. 3949 if (!drainProcedures || on) { 3950 return callFuture; 3951 } 3952 // otherwise we need to wait until all previous peer modification procedure done 3953 CompletableFuture<Boolean> future = new CompletableFuture<>(); 3954 addListener(callFuture, (prevOn, err) -> { 3955 if (err != null) { 3956 future.completeExceptionally(err); 3957 return; 3958 } 3959 // even if the previous state is disabled, we still need to wait here, as there could be 3960 // another client thread which called this method just before us and have already changed the 3961 // state to off, but there are still peer modification procedures not finished, so we should 3962 // also wait here. 3963 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 3964 }); 3965 return future; 3966 } 3967 3968 @Override 3969 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 3970 return this.<Boolean> newMasterCaller() 3971 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 3972 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 3973 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 3974 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 3975 (resp) -> resp.getEnabled())) 3976 .call(); 3977 } 3978 3979 @Override 3980 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3981 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3982 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3983 if (err != null) { 3984 future.completeExceptionally(err); 3985 return; 3986 } 3987 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3988 locations.stream().filter(l -> l.getRegion() != null) 3989 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3990 .collect(Collectors.groupingBy(l -> l.getServerName(), 3991 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3992 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3993 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3994 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3995 futures 3996 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3997 if (err2 != null) { 3998 future.completeExceptionally(unwrapCompletionException(err2)); 3999 } else { 4000 aggregator.append(stats); 4001 } 4002 })); 4003 } 4004 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 4005 (ret, err3) -> { 4006 if (err3 != null) { 4007 future.completeExceptionally(unwrapCompletionException(err3)); 4008 } else { 4009 future.complete(aggregator.sum()); 4010 } 4011 }); 4012 }); 4013 return future; 4014 } 4015 4016 @Override 4017 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 4018 boolean preserveSplits) { 4019 CompletableFuture<Void> future = new CompletableFuture<>(); 4020 addListener(tableExists(tableName), (exist, err) -> { 4021 if (err != null) { 4022 future.completeExceptionally(err); 4023 return; 4024 } 4025 if (!exist) { 4026 future.completeExceptionally(new TableNotFoundException(tableName)); 4027 return; 4028 } 4029 addListener(tableExists(newTableName), (exist1, err1) -> { 4030 if (err1 != null) { 4031 future.completeExceptionally(err1); 4032 return; 4033 } 4034 if (exist1) { 4035 future.completeExceptionally(new TableExistsException(newTableName)); 4036 return; 4037 } 4038 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 4039 if (err2 != null) { 4040 future.completeExceptionally(err2); 4041 return; 4042 } 4043 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 4044 if (preserveSplits) { 4045 addListener(getTableSplits(tableName), (splits, err3) -> { 4046 if (err3 != null) { 4047 future.completeExceptionally(err3); 4048 } else { 4049 addListener( 4050 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 4051 (result, err4) -> { 4052 if (err4 != null) { 4053 future.completeExceptionally(err4); 4054 } else { 4055 future.complete(result); 4056 } 4057 }); 4058 } 4059 }); 4060 } else { 4061 addListener(createTable(newTableDesc), (result, err5) -> { 4062 if (err5 != null) { 4063 future.completeExceptionally(err5); 4064 } else { 4065 future.complete(result); 4066 } 4067 }); 4068 } 4069 }); 4070 }); 4071 }); 4072 return future; 4073 } 4074 4075 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4076 List<RegionInfo> hris) { 4077 return this.<CacheEvictionStats> newAdminCaller() 4078 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4079 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4080 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4081 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4082 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4083 .serverName(serverName).call(); 4084 } 4085 4086 @Override 4087 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4088 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4089 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4090 Boolean> call(controller, stub, 4091 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4092 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4093 resp -> resp.getPreviousRpcThrottleEnabled())) 4094 .call(); 4095 return future; 4096 } 4097 4098 @Override 4099 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4100 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4101 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4102 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4103 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4104 resp -> resp.getRpcThrottleEnabled())) 4105 .call(); 4106 return future; 4107 } 4108 4109 @Override 4110 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4111 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4112 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4113 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4114 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4115 .build(), 4116 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4117 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4118 .call(); 4119 return future; 4120 } 4121 4122 @Override 4123 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4124 return this.<Map<TableName, Long>> newMasterCaller() 4125 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4126 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4127 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4128 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4129 resp -> resp.getSizesList().stream().collect(Collectors 4130 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4131 .call(); 4132 } 4133 4134 @Override 4135 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4136 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4137 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4138 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4139 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4140 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4141 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4142 resp -> resp.getSnapshotsList().stream() 4143 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4144 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4145 .serverName(serverName).call(); 4146 } 4147 4148 private CompletableFuture<SpaceQuotaSnapshot> 4149 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4150 return this.<SpaceQuotaSnapshot> newMasterCaller() 4151 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4152 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4153 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4154 .call(); 4155 } 4156 4157 @Override 4158 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4159 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4160 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4161 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4162 } 4163 4164 @Override 4165 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4166 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4167 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4168 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4169 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4170 } 4171 4172 @Override 4173 public CompletableFuture<Void> grant(UserPermission userPermission, 4174 boolean mergeExistingPermissions) { 4175 return this.<Void> newMasterCaller() 4176 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4177 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4178 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4179 .call(); 4180 } 4181 4182 @Override 4183 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4184 return this.<Void> newMasterCaller() 4185 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4186 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4187 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4188 .call(); 4189 } 4190 4191 @Override 4192 public CompletableFuture<List<UserPermission>> 4193 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4194 return this.<List<UserPermission>> newMasterCaller() 4195 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4196 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4197 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4198 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4199 resp -> resp.getUserPermissionList().stream() 4200 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4201 .collect(Collectors.toList()))) 4202 .call(); 4203 } 4204 4205 @Override 4206 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4207 List<Permission> permissions) { 4208 return this.<List<Boolean>> newMasterCaller() 4209 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4210 List<Boolean>> call(controller, stub, 4211 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4212 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4213 resp -> resp.getHasUserPermissionList())) 4214 .call(); 4215 } 4216 4217 @Override 4218 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4219 return this.<Boolean> newMasterCaller() 4220 .action((controller, stub) -> this.call(controller, stub, 4221 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4222 MasterService.Interface::switchSnapshotCleanup, 4223 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4224 .call(); 4225 } 4226 4227 @Override 4228 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4229 return this.<Boolean> newMasterCaller() 4230 .action((controller, stub) -> this.call(controller, stub, 4231 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4232 MasterService.Interface::isSnapshotCleanupEnabled, 4233 IsSnapshotCleanupEnabledResponse::getEnabled)) 4234 .call(); 4235 } 4236 4237 @Override 4238 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4239 return this.<Void> newMasterCaller() 4240 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4241 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4242 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4243 .call(); 4244 } 4245 4246 @Override 4247 public CompletableFuture<Void> addRSGroup(String groupName) { 4248 return this.<Void> newMasterCaller() 4249 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4250 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4251 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4252 .call(); 4253 } 4254 4255 @Override 4256 public CompletableFuture<Void> removeRSGroup(String groupName) { 4257 return this.<Void> newMasterCaller() 4258 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4259 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4260 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4261 .call(); 4262 } 4263 4264 @Override 4265 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4266 BalanceRequest request) { 4267 return this.<BalanceResponse> newMasterCaller() 4268 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4269 BalanceResponse> call(controller, stub, 4270 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4271 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4272 .call(); 4273 } 4274 4275 @Override 4276 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4277 return this.<List<RSGroupInfo>> newMasterCaller() 4278 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4279 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4280 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4281 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4282 .call(); 4283 } 4284 4285 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4286 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4287 final String logType) { 4288 if (CollectionUtils.isEmpty(serverNames)) { 4289 return CompletableFuture.completedFuture(Collections.emptyList()); 4290 } 4291 return CompletableFuture.supplyAsync(() -> serverNames 4292 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4293 filterParams, limit, logType)) 4294 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4295 } 4296 4297 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4298 Map<String, Object> filterParams, int limit, String logType) { 4299 return this.<List<LogEntry>> newAdminCaller() 4300 .action((controller, stub) -> this.adminCall(controller, stub, 4301 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4302 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4303 .serverName(serverName).call(); 4304 } 4305 4306 @Override 4307 public CompletableFuture<List<Boolean>> 4308 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4309 if (CollectionUtils.isEmpty(serverNames)) { 4310 return CompletableFuture.completedFuture(Collections.emptyList()); 4311 } 4312 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4313 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4314 return convertToFutureOfList(clearSlowLogResponseList); 4315 } 4316 4317 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4318 return this.<Boolean> newAdminCaller() 4319 .action((controller, stub) -> this.adminCall(controller, stub, 4320 RequestConverter.buildClearSlowLogResponseRequest(), 4321 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4322 .serverName(serverName).call(); 4323 } 4324 4325 private static <T> CompletableFuture<List<T>> 4326 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4327 CompletableFuture<Void> allDoneFuture = 4328 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4329 return allDoneFuture 4330 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4331 } 4332 4333 @Override 4334 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4335 return this.<List<TableName>> newMasterCaller() 4336 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4337 List<TableName>> call(controller, stub, 4338 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4339 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4340 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4341 .call(); 4342 } 4343 4344 @Override 4345 public CompletableFuture<Pair<List<String>, List<TableName>>> 4346 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4347 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4348 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4349 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4350 Pair<List<String>, List<TableName>>> call(controller, stub, 4351 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4352 .build(), 4353 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4354 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4355 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4356 .call(); 4357 } 4358 4359 @Override 4360 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4361 return this.<RSGroupInfo> newMasterCaller() 4362 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4363 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4364 GetRSGroupInfoOfServerRequest.newBuilder() 4365 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4366 .setPort(hostPort.getPort()).build()) 4367 .build(), 4368 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4369 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4370 .call(); 4371 } 4372 4373 @Override 4374 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4375 return this.<Void> newMasterCaller() 4376 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4377 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4378 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4379 .call(); 4380 } 4381 4382 @Override 4383 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4384 CompletableFuture<Void> future = new CompletableFuture<>(); 4385 for (TableName tableName : tables) { 4386 addListener(tableExists(tableName), (exist, err) -> { 4387 if (err != null) { 4388 future.completeExceptionally(err); 4389 return; 4390 } 4391 if (!exist) { 4392 future.completeExceptionally(new TableNotFoundException(tableName)); 4393 return; 4394 } 4395 }); 4396 } 4397 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4398 if (err != null) { 4399 future.completeExceptionally(err); 4400 return; 4401 } 4402 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4403 future.complete(null); 4404 return; 4405 } 4406 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4407 for (TableDescriptor td : tableDescriptions) { 4408 newTableDescriptors 4409 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4410 } 4411 addListener( 4412 CompletableFuture.allOf( 4413 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4414 (v, e) -> { 4415 if (e != null) { 4416 future.completeExceptionally(e); 4417 } else { 4418 future.complete(v); 4419 } 4420 }); 4421 }); 4422 return future; 4423 } 4424 4425 @Override 4426 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4427 return this.<RSGroupInfo> newMasterCaller() 4428 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4429 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4430 GetRSGroupInfoOfTableRequest.newBuilder() 4431 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4432 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4433 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4434 .call(); 4435 } 4436 4437 @Override 4438 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4439 return this.<RSGroupInfo> newMasterCaller() 4440 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4441 RSGroupInfo> call(controller, stub, 4442 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4443 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4444 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4445 .call(); 4446 } 4447 4448 @Override 4449 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4450 return this.<Void> newMasterCaller() 4451 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4452 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4453 .setNewRsgroupName(newName).build(), 4454 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4455 .call(); 4456 } 4457 4458 @Override 4459 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4460 Map<String, String> configuration) { 4461 UpdateRSGroupConfigRequest.Builder request = 4462 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4463 if (configuration != null) { 4464 configuration.entrySet().forEach(e -> request.addConfiguration( 4465 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4466 } 4467 return this.<Void> newMasterCaller() 4468 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4469 Void> call(controller, stub, request.build(), 4470 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4471 .call(); 4472 } 4473 4474 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4475 return this.<List<LogEntry>> newMasterCaller() 4476 .action((controller, stub) -> this.call(controller, stub, 4477 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4478 ProtobufUtil::toBalancerDecisionResponse)) 4479 .call(); 4480 } 4481 4482 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4483 return this.<List<LogEntry>> newMasterCaller() 4484 .action((controller, stub) -> this.call(controller, stub, 4485 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4486 ProtobufUtil::toBalancerRejectionResponse)) 4487 .call(); 4488 } 4489 4490 @Override 4491 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4492 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4493 if (logType == null || serverType == null) { 4494 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4495 } 4496 switch (logType) { 4497 case "SLOW_LOG": 4498 case "LARGE_LOG": 4499 if (ServerType.MASTER.equals(serverType)) { 4500 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4501 } 4502 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4503 case "BALANCER_DECISION": 4504 if (ServerType.REGION_SERVER.equals(serverType)) { 4505 throw new IllegalArgumentException( 4506 "Balancer Decision logs are not maintained by HRegionServer"); 4507 } 4508 return getBalancerDecisions(limit); 4509 case "BALANCER_REJECTION": 4510 if (ServerType.REGION_SERVER.equals(serverType)) { 4511 throw new IllegalArgumentException( 4512 "Balancer Rejection logs are not maintained by HRegionServer"); 4513 } 4514 return getBalancerRejections(limit); 4515 default: 4516 return CompletableFuture.completedFuture(Collections.emptyList()); 4517 } 4518 } 4519 4520 @Override 4521 public CompletableFuture<Void> flushMasterStore() { 4522 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4523 return this.<Void> newMasterCaller() 4524 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4525 Void> call(controller, stub, request.build(), 4526 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4527 .call(); 4528 } 4529 4530 @Override 4531 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4532 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4533 return this.<List<String>> newAdminCaller() 4534 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4535 List<String>> adminCall(controller, stub, request.build(), 4536 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4537 resp -> resp.getCachedFilesList())) 4538 .serverName(serverName).call(); 4539 } 4540}