001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.net.URI; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 052import org.apache.hadoop.hbase.CatalogFamilyFormat; 053import org.apache.hadoop.hbase.ClientMetaTableAccessor; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.NamespaceDescriptor; 061import org.apache.hadoop.hbase.RegionLocations; 062import org.apache.hadoop.hbase.RegionMetrics; 063import org.apache.hadoop.hbase.RegionMetricsBuilder; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableExistsException; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotDisabledException; 068import org.apache.hadoop.hbase.TableNotEnabledException; 069import org.apache.hadoop.hbase.TableNotFoundException; 070import org.apache.hadoop.hbase.UnknownRegionException; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.Scan.ReadType; 075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 076import org.apache.hadoop.hbase.client.replication.TableCFs; 077import org.apache.hadoop.hbase.client.security.SecurityCapability; 078import org.apache.hadoop.hbase.exceptions.DeserializationException; 079import org.apache.hadoop.hbase.ipc.HBaseRpcController; 080import org.apache.hadoop.hbase.net.Address; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 086import org.apache.hadoop.hbase.replication.ReplicationException; 087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 089import org.apache.hadoop.hbase.replication.SyncReplicationState; 090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 092import org.apache.hadoop.hbase.security.access.Permission; 093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 094import org.apache.hadoop.hbase.security.access.UserPermission; 095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.Strings; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 108import org.apache.hbase.thirdparty.com.google.protobuf.Message; 109import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 110import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 111import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 112import org.apache.hbase.thirdparty.io.netty.util.Timeout; 113import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 115 116import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 117import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 357import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 358 359/** 360 * The implementation of AsyncAdmin. 361 * <p> 362 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 363 * be finished inside the rpc framework thread, which means that the callbacks registered to the 364 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 365 * this class should not try to do time consuming tasks in the callbacks. 366 * @since 2.0.0 367 * @see AsyncHBaseAdmin 368 * @see AsyncConnection#getAdmin() 369 * @see AsyncConnection#getAdminBuilder() 370 */ 371@InterfaceAudience.Private 372class RawAsyncHBaseAdmin implements AsyncAdmin { 373 374 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 375 376 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 377 378 private final AsyncConnectionImpl connection; 379 380 private final HashedWheelTimer retryTimer; 381 382 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 383 384 private final long rpcTimeoutNs; 385 386 private final long operationTimeoutNs; 387 388 private final long pauseNs; 389 390 private final long pauseNsForServerOverloaded; 391 392 private final int maxAttempts; 393 394 private final int startLogErrorsCnt; 395 396 private final NonceGenerator ng; 397 398 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 399 AsyncAdminBuilderBase builder) { 400 this.connection = connection; 401 this.retryTimer = retryTimer; 402 this.metaTable = connection.getTable(META_TABLE_NAME); 403 this.rpcTimeoutNs = builder.rpcTimeoutNs; 404 this.operationTimeoutNs = builder.operationTimeoutNs; 405 this.pauseNs = builder.pauseNs; 406 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 407 LOG.warn( 408 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 409 + " the normal pause value {} ms, use the greater one instead", 410 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 411 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 412 this.pauseNsForServerOverloaded = builder.pauseNs; 413 } else { 414 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 415 } 416 this.maxAttempts = builder.maxAttempts; 417 this.startLogErrorsCnt = builder.startLogErrorsCnt; 418 this.ng = connection.getNonceGenerator(); 419 } 420 421 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 422 return this.connection.callerFactory.<T> masterRequest() 423 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 424 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 425 .pause(pauseNs, TimeUnit.NANOSECONDS) 426 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 427 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 428 } 429 430 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 431 return this.connection.callerFactory.<T> adminRequest() 432 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 433 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 434 .pause(pauseNs, TimeUnit.NANOSECONDS) 435 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 436 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 437 } 438 439 @FunctionalInterface 440 private interface MasterRpcCall<RESP, REQ> { 441 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 442 RpcCallback<RESP> done); 443 } 444 445 @FunctionalInterface 446 private interface AdminRpcCall<RESP, REQ> { 447 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 448 RpcCallback<RESP> done); 449 } 450 451 @FunctionalInterface 452 private interface Converter<D, S> { 453 D convert(S src) throws IOException; 454 } 455 456 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 457 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 458 Converter<RESP, PRESP> respConverter) { 459 CompletableFuture<RESP> future = new CompletableFuture<>(); 460 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 461 462 @Override 463 public void run(PRESP resp) { 464 if (controller.failed()) { 465 future.completeExceptionally(controller.getFailed()); 466 } else { 467 try { 468 future.complete(respConverter.convert(resp)); 469 } catch (IOException e) { 470 future.completeExceptionally(e); 471 } 472 } 473 } 474 }); 475 return future; 476 } 477 478 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 479 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 480 Converter<RESP, PRESP> respConverter) { 481 CompletableFuture<RESP> future = new CompletableFuture<>(); 482 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 483 484 @Override 485 public void run(PRESP resp) { 486 if (controller.failed()) { 487 future.completeExceptionally(controller.getFailed()); 488 } else { 489 try { 490 future.complete(respConverter.convert(resp)); 491 } catch (IOException e) { 492 future.completeExceptionally(e); 493 } 494 } 495 } 496 }); 497 return future; 498 } 499 500 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 501 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 502 ProcedureBiConsumer consumer) { 503 return procedureCall(b -> { 504 }, preq, rpcCall, respConverter, consumer); 505 } 506 507 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 508 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 509 ProcedureBiConsumer consumer) { 510 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 511 } 512 513 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 514 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 515 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 516 ProcedureBiConsumer consumer) { 517 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 518 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 519 prioritySetter.accept(builder); 520 CompletableFuture<Long> procFuture = builder.call(); 521 CompletableFuture<Void> future = waitProcedureResult(procFuture); 522 addListener(future, consumer); 523 return future; 524 } 525 526 @Override 527 public CompletableFuture<Boolean> tableExists(TableName tableName) { 528 if (TableName.isMetaTableName(tableName)) { 529 return CompletableFuture.completedFuture(true); 530 } 531 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 532 } 533 534 @Override 535 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 536 return getTableDescriptors( 537 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 538 } 539 540 /** 541 * {@link #listTableDescriptors(boolean)} 542 */ 543 @Override 544 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 545 boolean includeSysTables) { 546 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 547 + "use listTableDescriptors(boolean) instead"); 548 return getTableDescriptors( 549 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 550 } 551 552 @Override 553 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 554 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 555 + "use listTableDescriptors(boolean) instead"); 556 if (tableNames.isEmpty()) { 557 return CompletableFuture.completedFuture(Collections.emptyList()); 558 } 559 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 560 } 561 562 private CompletableFuture<List<TableDescriptor>> 563 getTableDescriptors(GetTableDescriptorsRequest request) { 564 return this.<List<TableDescriptor>> newMasterCaller() 565 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 566 List<TableDescriptor>> call(controller, stub, request, 567 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 568 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 569 .call(); 570 } 571 572 @Override 573 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 574 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 575 } 576 577 @Override 578 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 579 boolean includeSysTables) { 580 Preconditions.checkNotNull(pattern, 581 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 582 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 583 } 584 585 @Override 586 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 587 return this.<List<TableName>> newMasterCaller() 588 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 589 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 590 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 591 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 592 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 593 .call(); 594 } 595 596 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 597 return this.<List<TableName>> newMasterCaller() 598 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 599 List<TableName>> call(controller, stub, request, 600 (s, c, req, done) -> s.getTableNames(c, req, done), 601 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 602 .call(); 603 } 604 605 @Override 606 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 607 return this.<List<TableDescriptor>> newMasterCaller() 608 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 609 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 610 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 611 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 612 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 613 .call(); 614 } 615 616 @Override 617 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 618 return this.<List<TableDescriptor>> newMasterCaller() 619 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 620 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 621 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 622 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 623 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 624 .call(); 625 } 626 627 @Override 628 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 629 return this.<List<TableName>> newMasterCaller() 630 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 631 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 632 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 633 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 634 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 635 .call(); 636 } 637 638 @Override 639 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 640 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 641 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 642 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 643 List<TableSchema>> call(controller, stub, 644 RequestConverter.buildGetTableDescriptorsRequest(tableName), 645 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 646 (resp) -> resp.getTableSchemaList())) 647 .call(), (tableSchemas, error) -> { 648 if (error != null) { 649 future.completeExceptionally(error); 650 return; 651 } 652 if (!tableSchemas.isEmpty()) { 653 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 654 } else { 655 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 656 } 657 }); 658 return future; 659 } 660 661 @Override 662 public CompletableFuture<Void> createTable(TableDescriptor desc) { 663 return createTable(desc.getTableName(), 664 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 665 } 666 667 @Override 668 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 669 int numRegions) { 670 try { 671 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 672 } catch (IllegalArgumentException e) { 673 return failedFuture(e); 674 } 675 } 676 677 @Override 678 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 679 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 680 + " use createTable(TableDescriptor) instead"); 681 try { 682 verifySplitKeys(splitKeys); 683 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 684 splitKeys, ng.getNonceGroup(), ng.newNonce())); 685 } catch (IllegalArgumentException e) { 686 return failedFuture(e); 687 } 688 } 689 690 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 691 Preconditions.checkNotNull(tableName, "table name is null"); 692 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 693 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 694 new CreateTableProcedureBiConsumer(tableName)); 695 } 696 697 @Override 698 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 699 return modifyTable(desc, true); 700 } 701 702 @Override 703 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 704 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 705 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 706 ng.newNonce(), reopenRegions), 707 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 708 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 709 } 710 711 @Override 712 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 713 return this.<ModifyTableStoreFileTrackerRequest, 714 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 715 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 716 ng.getNonceGroup(), ng.newNonce()), 717 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 718 (resp) -> resp.getProcId(), 719 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 720 } 721 722 @Override 723 public CompletableFuture<Void> deleteTable(TableName tableName) { 724 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 725 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 726 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 727 new DeleteTableProcedureBiConsumer(tableName)); 728 } 729 730 @Override 731 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 732 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 733 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 734 ng.newNonce()), 735 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 736 new TruncateTableProcedureBiConsumer(tableName)); 737 } 738 739 @Override 740 public CompletableFuture<Void> enableTable(TableName tableName) { 741 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 742 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 743 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 744 new EnableTableProcedureBiConsumer(tableName)); 745 } 746 747 @Override 748 public CompletableFuture<Void> disableTable(TableName tableName) { 749 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 750 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 751 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 752 new DisableTableProcedureBiConsumer(tableName)); 753 } 754 755 /** 756 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 757 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 758 * targetState). 759 */ 760 private static CompletableFuture<Boolean> completeCheckTableState( 761 CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error, 762 TableState.State targetState, TableName tableName) { 763 if (error != null) { 764 future.completeExceptionally(error); 765 } else { 766 if (tableState.isPresent()) { 767 future.complete(tableState.get().inStates(targetState)); 768 } else { 769 future.completeExceptionally(new TableNotFoundException(tableName)); 770 } 771 } 772 return future; 773 } 774 775 @Override 776 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 777 if (TableName.isMetaTableName(tableName)) { 778 return CompletableFuture.completedFuture(true); 779 } 780 CompletableFuture<Boolean> future = new CompletableFuture<>(); 781 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 782 (tableState, error) -> { 783 completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName); 784 }); 785 return future; 786 } 787 788 @Override 789 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 790 if (TableName.isMetaTableName(tableName)) { 791 return CompletableFuture.completedFuture(false); 792 } 793 CompletableFuture<Boolean> future = new CompletableFuture<>(); 794 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 795 (tableState, error) -> { 796 completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName); 797 }); 798 return future; 799 } 800 801 @Override 802 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 803 if (TableName.isMetaTableName(tableName)) { 804 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 805 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 806 } 807 CompletableFuture<Boolean> future = new CompletableFuture<>(); 808 addListener(isTableEnabled(tableName), (enabled, error) -> { 809 if (error != null) { 810 if (error instanceof TableNotFoundException) { 811 future.complete(false); 812 } else { 813 future.completeExceptionally(error); 814 } 815 return; 816 } 817 if (!enabled) { 818 future.complete(false); 819 } else { 820 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 821 (locations, error1) -> { 822 if (error1 != null) { 823 future.completeExceptionally(error1); 824 return; 825 } 826 List<HRegionLocation> notDeployedRegions = locations.stream() 827 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 828 if (notDeployedRegions.size() > 0) { 829 if (LOG.isDebugEnabled()) { 830 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 831 } 832 future.complete(false); 833 return; 834 } 835 future.complete(true); 836 }); 837 } 838 }); 839 return future; 840 } 841 842 @Override 843 public CompletableFuture<Void> addColumnFamily(TableName tableName, 844 ColumnFamilyDescriptor columnFamily) { 845 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 846 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 847 ng.newNonce()), 848 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 849 new AddColumnFamilyProcedureBiConsumer(tableName)); 850 } 851 852 @Override 853 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 854 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 855 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 856 ng.newNonce()), 857 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 858 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 859 } 860 861 @Override 862 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 863 ColumnFamilyDescriptor columnFamily) { 864 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 865 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 866 ng.newNonce()), 867 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 868 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 869 } 870 871 @Override 872 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 873 byte[] family, String dstSFT) { 874 return this.<ModifyColumnStoreFileTrackerRequest, 875 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 876 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 877 ng.getNonceGroup(), ng.newNonce()), 878 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 879 (resp) -> resp.getProcId(), 880 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 881 } 882 883 @Override 884 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 885 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 886 RequestConverter.buildCreateNamespaceRequest(descriptor), 887 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 888 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 889 } 890 891 @Override 892 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 893 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 894 RequestConverter.buildModifyNamespaceRequest(descriptor), 895 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 896 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 897 } 898 899 @Override 900 public CompletableFuture<Void> deleteNamespace(String name) { 901 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 902 RequestConverter.buildDeleteNamespaceRequest(name), 903 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 904 new DeleteNamespaceProcedureBiConsumer(name)); 905 } 906 907 @Override 908 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 909 return this.<NamespaceDescriptor> newMasterCaller() 910 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 911 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 912 RequestConverter.buildGetNamespaceDescriptorRequest(name), 913 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 914 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 915 .call(); 916 } 917 918 @Override 919 public CompletableFuture<List<String>> listNamespaces() { 920 return this.<List<String>> newMasterCaller() 921 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 922 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 923 (s, c, req, done) -> s.listNamespaces(c, req, done), 924 (resp) -> resp.getNamespaceNameList())) 925 .call(); 926 } 927 928 @Override 929 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 930 return this.<List<NamespaceDescriptor>> newMasterCaller() 931 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 932 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 933 ListNamespaceDescriptorsRequest.newBuilder().build(), 934 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 935 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 936 .call(); 937 } 938 939 @Override 940 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 941 return this.<List<RegionInfo>> newAdminCaller() 942 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 943 List<RegionInfo>> adminCall(controller, stub, 944 RequestConverter.buildGetOnlineRegionRequest(), 945 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 946 resp -> ProtobufUtil.getRegionInfos(resp))) 947 .serverName(serverName).call(); 948 } 949 950 @Override 951 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 952 if (tableName.equals(META_TABLE_NAME)) { 953 return connection.registry.getMetaRegionLocations() 954 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 955 .collect(Collectors.toList())); 956 } else { 957 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 958 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 959 } 960 } 961 962 @Override 963 public CompletableFuture<Void> flush(TableName tableName) { 964 return flush(tableName, Collections.emptyList()); 965 } 966 967 @Override 968 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 969 Preconditions.checkNotNull(columnFamily, 970 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 971 return flush(tableName, Collections.singletonList(columnFamily)); 972 } 973 974 @Override 975 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 976 // This is for keeping compatibility with old implementation. 977 // If the server version is lower than the client version, it's possible that the 978 // flushTable method is not present in the server side, if so, we need to fall back 979 // to the old implementation. 980 Preconditions.checkNotNull(columnFamilyList, 981 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 982 List<byte[]> columnFamilies = columnFamilyList.stream() 983 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 984 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 985 ng.getNonceGroup(), ng.newNonce()); 986 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 987 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 988 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 989 CompletableFuture<Void> future = new CompletableFuture<>(); 990 addListener(procFuture, (ret, error) -> { 991 if (error != null) { 992 if ( 993 error instanceof TableNotFoundException || error instanceof TableNotEnabledException 994 || error instanceof NoSuchColumnFamilyException 995 ) { 996 future.completeExceptionally(error); 997 } else if (error instanceof DoNotRetryIOException) { 998 // usually this is caused by the method is not present on the server or 999 // the hbase hadoop version does not match the running hadoop version. 1000 // if that happens, we need fall back to the old flush implementation. 1001 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 1002 legacyFlush(future, tableName, columnFamilies); 1003 } else { 1004 future.completeExceptionally(error); 1005 } 1006 } else { 1007 future.complete(ret); 1008 } 1009 }); 1010 return future; 1011 } 1012 1013 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1014 List<byte[]> columnFamilies) { 1015 addListener(tableExists(tableName), (exists, err) -> { 1016 if (err != null) { 1017 future.completeExceptionally(err); 1018 } else if (!exists) { 1019 future.completeExceptionally(new TableNotFoundException(tableName)); 1020 } else { 1021 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1022 if (err2 != null) { 1023 future.completeExceptionally(err2); 1024 } else if (!tableEnabled) { 1025 future.completeExceptionally(new TableNotEnabledException(tableName)); 1026 } else { 1027 Map<String, String> props = new HashMap<>(); 1028 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1029 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1030 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1031 } 1032 addListener( 1033 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1034 (ret, err3) -> { 1035 if (err3 != null) { 1036 future.completeExceptionally(err3); 1037 } else { 1038 future.complete(ret); 1039 } 1040 }); 1041 } 1042 }); 1043 } 1044 }); 1045 } 1046 1047 @Override 1048 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1049 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 1050 }); 1051 } 1052 1053 @Override 1054 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1055 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1056 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1057 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1058 }); 1059 } 1060 1061 /** 1062 * This method is for internal use only, where we need the response of the flush. 1063 * <p/> 1064 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1065 * API. 1066 */ 1067 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1068 boolean writeFlushWALMarker) { 1069 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1070 addListener(getRegionLocation(regionName), (location, err) -> { 1071 if (err != null) { 1072 future.completeExceptionally(err); 1073 return; 1074 } 1075 ServerName serverName = location.getServerName(); 1076 if (serverName == null) { 1077 future 1078 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1079 return; 1080 } 1081 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1082 (ret, err2) -> { 1083 if (err2 != null) { 1084 future.completeExceptionally(err2); 1085 } else { 1086 future.complete(ret); 1087 } 1088 }); 1089 }); 1090 return future; 1091 } 1092 1093 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1094 byte[] columnFamily, boolean writeFlushWALMarker) { 1095 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1096 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1097 FlushRegionResponse> adminCall(controller, stub, 1098 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1099 writeFlushWALMarker), 1100 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1101 .call(); 1102 } 1103 1104 @Override 1105 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1106 CompletableFuture<Void> future = new CompletableFuture<>(); 1107 addListener(getRegions(sn), (hRegionInfos, err) -> { 1108 if (err != null) { 1109 future.completeExceptionally(err); 1110 return; 1111 } 1112 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1113 if (hRegionInfos != null) { 1114 hRegionInfos 1115 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1116 }))); 1117 } 1118 addListener(CompletableFuture.allOf( 1119 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1120 if (err2 != null) { 1121 future.completeExceptionally(err2); 1122 } else { 1123 future.complete(ret); 1124 } 1125 }); 1126 }); 1127 return future; 1128 } 1129 1130 @Override 1131 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1132 return compact(tableName, null, false, compactType); 1133 } 1134 1135 @Override 1136 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1137 CompactType compactType) { 1138 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1139 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1140 return compact(tableName, columnFamily, false, compactType); 1141 } 1142 1143 @Override 1144 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1145 return compactRegion(regionName, null, false); 1146 } 1147 1148 @Override 1149 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1150 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1151 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1152 return compactRegion(regionName, columnFamily, false); 1153 } 1154 1155 @Override 1156 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1157 return compact(tableName, null, true, compactType); 1158 } 1159 1160 @Override 1161 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1162 CompactType compactType) { 1163 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1164 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1165 return compact(tableName, columnFamily, true, compactType); 1166 } 1167 1168 @Override 1169 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1170 return compactRegion(regionName, null, true); 1171 } 1172 1173 @Override 1174 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1175 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1176 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1177 return compactRegion(regionName, columnFamily, true); 1178 } 1179 1180 @Override 1181 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1182 return compactRegionServer(sn, false); 1183 } 1184 1185 @Override 1186 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1187 return compactRegionServer(sn, true); 1188 } 1189 1190 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1191 CompletableFuture<Void> future = new CompletableFuture<>(); 1192 addListener(getRegions(sn), (hRegionInfos, err) -> { 1193 if (err != null) { 1194 future.completeExceptionally(err); 1195 return; 1196 } 1197 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1198 if (hRegionInfos != null) { 1199 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1200 } 1201 addListener(CompletableFuture.allOf( 1202 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1203 if (err2 != null) { 1204 future.completeExceptionally(err2); 1205 } else { 1206 future.complete(ret); 1207 } 1208 }); 1209 }); 1210 return future; 1211 } 1212 1213 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1214 boolean major) { 1215 CompletableFuture<Void> future = new CompletableFuture<>(); 1216 addListener(getRegionLocation(regionName), (location, err) -> { 1217 if (err != null) { 1218 future.completeExceptionally(err); 1219 return; 1220 } 1221 ServerName serverName = location.getServerName(); 1222 if (serverName == null) { 1223 future 1224 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1225 return; 1226 } 1227 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1228 (ret, err2) -> { 1229 if (err2 != null) { 1230 future.completeExceptionally(err2); 1231 } else { 1232 future.complete(ret); 1233 } 1234 }); 1235 }); 1236 return future; 1237 } 1238 1239 /** 1240 * List all region locations for the specific table. 1241 */ 1242 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1243 if (TableName.META_TABLE_NAME.equals(tableName)) { 1244 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1245 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1246 if (err != null) { 1247 future.completeExceptionally(err); 1248 } else if ( 1249 metaRegions == null || metaRegions.isEmpty() 1250 || metaRegions.getDefaultRegionLocation() == null 1251 ) { 1252 future.completeExceptionally(new IOException("meta region does not found")); 1253 } else { 1254 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1255 } 1256 }); 1257 return future; 1258 } else { 1259 // For non-meta table, we fetch all locations by scanning hbase:meta table 1260 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1261 } 1262 } 1263 1264 /** 1265 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1266 */ 1267 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1268 CompactType compactType) { 1269 CompletableFuture<Void> future = new CompletableFuture<>(); 1270 1271 switch (compactType) { 1272 case MOB: 1273 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1274 if (err != null) { 1275 future.completeExceptionally(err); 1276 return; 1277 } 1278 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1279 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1280 if (err2 != null) { 1281 future.completeExceptionally(err2); 1282 } else { 1283 future.complete(ret); 1284 } 1285 }); 1286 }); 1287 break; 1288 case NORMAL: 1289 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1290 if (err != null) { 1291 future.completeExceptionally(err); 1292 return; 1293 } 1294 if (locations == null || locations.isEmpty()) { 1295 future.completeExceptionally(new TableNotFoundException(tableName)); 1296 } 1297 CompletableFuture<?>[] compactFutures = 1298 locations.stream().filter(l -> l.getRegion() != null) 1299 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1300 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1301 .toArray(CompletableFuture<?>[]::new); 1302 // future complete unless all of the compact futures are completed. 1303 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1304 if (err2 != null) { 1305 future.completeExceptionally(err2); 1306 } else { 1307 future.complete(ret); 1308 } 1309 }); 1310 }); 1311 break; 1312 default: 1313 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1314 } 1315 return future; 1316 } 1317 1318 /** 1319 * Compact the region at specific region server. 1320 */ 1321 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1322 final boolean major, byte[] columnFamily) { 1323 return this.<Void> newAdminCaller().serverName(sn) 1324 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1325 Void> adminCall(controller, stub, 1326 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1327 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1328 .call(); 1329 } 1330 1331 private byte[] toEncodeRegionName(byte[] regionName) { 1332 return RegionInfo.isEncodedRegionName(regionName) 1333 ? regionName 1334 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1335 } 1336 1337 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1338 CompletableFuture<TableName> result) { 1339 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1340 if (err != null) { 1341 result.completeExceptionally(err); 1342 return; 1343 } 1344 RegionInfo regionInfo = location.getRegion(); 1345 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1346 result.completeExceptionally( 1347 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1348 return; 1349 } 1350 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1351 if (!tableName.get().equals(regionInfo.getTable())) { 1352 // tables of this two region should be same. 1353 result.completeExceptionally( 1354 new IllegalArgumentException("Cannot merge regions from two different tables " 1355 + tableName.get() + " and " + regionInfo.getTable())); 1356 } else { 1357 result.complete(tableName.get()); 1358 } 1359 } 1360 }); 1361 } 1362 1363 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1364 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1365 CompletableFuture<TableName> future = new CompletableFuture<>(); 1366 for (byte[] encodedRegionName : encodedRegionNames) { 1367 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1368 } 1369 return future; 1370 } 1371 1372 @Override 1373 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1374 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1375 } 1376 1377 @Override 1378 public CompletableFuture<Boolean> isMergeEnabled() { 1379 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1380 } 1381 1382 @Override 1383 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1384 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1385 } 1386 1387 @Override 1388 public CompletableFuture<Boolean> isSplitEnabled() { 1389 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1390 } 1391 1392 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1393 MasterSwitchType switchType) { 1394 SetSplitOrMergeEnabledRequest request = 1395 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1396 return this.<Boolean> newMasterCaller() 1397 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1398 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1399 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1400 (resp) -> resp.getPrevValueList().get(0))) 1401 .call(); 1402 } 1403 1404 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1405 IsSplitOrMergeEnabledRequest request = 1406 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1407 return this.<Boolean> newMasterCaller() 1408 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1409 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1410 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1411 .call(); 1412 } 1413 1414 @Override 1415 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1416 if (nameOfRegionsToMerge.size() < 2) { 1417 return failedFuture(new IllegalArgumentException( 1418 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1419 } 1420 CompletableFuture<Void> future = new CompletableFuture<>(); 1421 byte[][] encodedNameOfRegionsToMerge = 1422 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1423 1424 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1425 if (err != null) { 1426 future.completeExceptionally(err); 1427 return; 1428 } 1429 1430 final MergeTableRegionsRequest request; 1431 try { 1432 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1433 forcible, ng.getNonceGroup(), ng.newNonce()); 1434 } catch (DeserializationException e) { 1435 future.completeExceptionally(e); 1436 return; 1437 } 1438 1439 addListener( 1440 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1441 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1442 (ret, err2) -> { 1443 if (err2 != null) { 1444 future.completeExceptionally(err2); 1445 } else { 1446 future.complete(ret); 1447 } 1448 }); 1449 }); 1450 return future; 1451 } 1452 1453 @Override 1454 public CompletableFuture<Void> split(TableName tableName) { 1455 CompletableFuture<Void> future = new CompletableFuture<>(); 1456 addListener(tableExists(tableName), (exist, error) -> { 1457 if (error != null) { 1458 future.completeExceptionally(error); 1459 return; 1460 } 1461 if (!exist) { 1462 future.completeExceptionally(new TableNotFoundException(tableName)); 1463 return; 1464 } 1465 addListener(metaTable 1466 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1467 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1468 ClientMetaTableAccessor.QueryType.REGION)) 1469 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1470 ClientMetaTableAccessor.QueryType.REGION))), 1471 (results, err2) -> { 1472 if (err2 != null) { 1473 future.completeExceptionally(err2); 1474 return; 1475 } 1476 if (results != null && !results.isEmpty()) { 1477 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1478 for (Result r : results) { 1479 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1480 continue; 1481 } 1482 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1483 if (rl != null) { 1484 for (HRegionLocation h : rl.getRegionLocations()) { 1485 if (h != null && h.getServerName() != null) { 1486 RegionInfo hri = h.getRegion(); 1487 if ( 1488 hri == null || hri.isSplitParent() 1489 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1490 ) { 1491 continue; 1492 } 1493 splitFutures.add(split(hri, null)); 1494 } 1495 } 1496 } 1497 } 1498 addListener( 1499 CompletableFuture 1500 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1501 (ret, exception) -> { 1502 if (exception != null) { 1503 future.completeExceptionally(exception); 1504 return; 1505 } 1506 future.complete(ret); 1507 }); 1508 } else { 1509 future.complete(null); 1510 } 1511 }); 1512 }); 1513 return future; 1514 } 1515 1516 @Override 1517 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1518 CompletableFuture<Void> result = new CompletableFuture<>(); 1519 if (splitPoint == null) { 1520 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1521 } 1522 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1523 (loc, err) -> { 1524 if (err != null) { 1525 result.completeExceptionally(err); 1526 } else if (loc == null || loc.getRegion() == null) { 1527 result.completeExceptionally(new IllegalArgumentException( 1528 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1529 } else { 1530 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1531 if (err2 != null) { 1532 result.completeExceptionally(err2); 1533 } else { 1534 result.complete(ret); 1535 } 1536 1537 }); 1538 } 1539 }); 1540 return result; 1541 } 1542 1543 @Override 1544 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1545 CompletableFuture<Void> future = new CompletableFuture<>(); 1546 addListener(getRegionLocation(regionName), (location, err) -> { 1547 if (err != null) { 1548 future.completeExceptionally(err); 1549 return; 1550 } 1551 RegionInfo regionInfo = location.getRegion(); 1552 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1553 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1554 + "Replicas are auto-split when their primary is split.")); 1555 return; 1556 } 1557 ServerName serverName = location.getServerName(); 1558 if (serverName == null) { 1559 future 1560 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1561 return; 1562 } 1563 addListener(split(regionInfo, null), (ret, err2) -> { 1564 if (err2 != null) { 1565 future.completeExceptionally(err2); 1566 } else { 1567 future.complete(ret); 1568 } 1569 }); 1570 }); 1571 return future; 1572 } 1573 1574 @Override 1575 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1576 Preconditions.checkNotNull(splitPoint, 1577 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1578 CompletableFuture<Void> future = new CompletableFuture<>(); 1579 addListener(getRegionLocation(regionName), (location, err) -> { 1580 if (err != null) { 1581 future.completeExceptionally(err); 1582 return; 1583 } 1584 RegionInfo regionInfo = location.getRegion(); 1585 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1586 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1587 + "Replicas are auto-split when their primary is split.")); 1588 return; 1589 } 1590 ServerName serverName = location.getServerName(); 1591 if (serverName == null) { 1592 future 1593 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1594 return; 1595 } 1596 if ( 1597 regionInfo.getStartKey() != null 1598 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1599 ) { 1600 future.completeExceptionally( 1601 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1602 return; 1603 } 1604 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1605 if (err2 != null) { 1606 future.completeExceptionally(err2); 1607 } else { 1608 future.complete(ret); 1609 } 1610 }); 1611 }); 1612 return future; 1613 } 1614 1615 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1616 CompletableFuture<Void> future = new CompletableFuture<>(); 1617 TableName tableName = hri.getTable(); 1618 final SplitTableRegionRequest request; 1619 try { 1620 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1621 ng.newNonce()); 1622 } catch (DeserializationException e) { 1623 future.completeExceptionally(e); 1624 return future; 1625 } 1626 1627 addListener( 1628 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1629 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1630 (ret, err2) -> { 1631 if (err2 != null) { 1632 future.completeExceptionally(err2); 1633 } else { 1634 future.complete(ret); 1635 } 1636 }); 1637 return future; 1638 } 1639 1640 @Override 1641 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1642 CompletableFuture<Void> future = new CompletableFuture<>(); 1643 addListener(getRegionLocation(regionName), (location, err) -> { 1644 if (err != null) { 1645 future.completeExceptionally(err); 1646 return; 1647 } 1648 RegionInfo regionInfo = location.getRegion(); 1649 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1650 future.completeExceptionally(new IllegalArgumentException( 1651 "Can't truncate replicas directly.Replicas are auto-truncated " 1652 + "when their primary is truncated.")); 1653 return; 1654 } 1655 ServerName serverName = location.getServerName(); 1656 if (serverName == null) { 1657 future 1658 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1659 return; 1660 } 1661 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1662 if (err2 != null) { 1663 future.completeExceptionally(err2); 1664 } else { 1665 future.complete(ret); 1666 } 1667 }); 1668 }); 1669 return future; 1670 } 1671 1672 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1673 CompletableFuture<Void> future = new CompletableFuture<>(); 1674 TableName tableName = hri.getTable(); 1675 final MasterProtos.TruncateRegionRequest request; 1676 try { 1677 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1678 } catch (DeserializationException e) { 1679 future.completeExceptionally(e); 1680 return future; 1681 } 1682 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1683 MasterProtos.TruncateRegionResponse::getProcId, 1684 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1685 if (err2 != null) { 1686 future.completeExceptionally(err2); 1687 } else { 1688 future.complete(ret); 1689 } 1690 }); 1691 return future; 1692 } 1693 1694 @Override 1695 public CompletableFuture<Void> assign(byte[] regionName) { 1696 CompletableFuture<Void> future = new CompletableFuture<>(); 1697 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1698 if (err != null) { 1699 future.completeExceptionally(err); 1700 return; 1701 } 1702 addListener( 1703 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1704 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1705 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1706 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1707 .call(), 1708 (ret, err2) -> { 1709 if (err2 != null) { 1710 future.completeExceptionally(err2); 1711 } else { 1712 future.complete(ret); 1713 } 1714 }); 1715 }); 1716 return future; 1717 } 1718 1719 @Override 1720 public CompletableFuture<Void> unassign(byte[] regionName) { 1721 CompletableFuture<Void> future = new CompletableFuture<>(); 1722 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1723 if (err != null) { 1724 future.completeExceptionally(err); 1725 return; 1726 } 1727 addListener( 1728 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1729 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1730 Void> call(controller, stub, 1731 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1732 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1733 .call(), 1734 (ret, err2) -> { 1735 if (err2 != null) { 1736 future.completeExceptionally(err2); 1737 } else { 1738 future.complete(ret); 1739 } 1740 }); 1741 }); 1742 return future; 1743 } 1744 1745 @Override 1746 public CompletableFuture<Void> offline(byte[] regionName) { 1747 CompletableFuture<Void> future = new CompletableFuture<>(); 1748 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1749 if (err != null) { 1750 future.completeExceptionally(err); 1751 return; 1752 } 1753 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1754 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1755 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1756 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1757 .call(), (ret, err2) -> { 1758 if (err2 != null) { 1759 future.completeExceptionally(err2); 1760 } else { 1761 future.complete(ret); 1762 } 1763 }); 1764 }); 1765 return future; 1766 } 1767 1768 @Override 1769 public CompletableFuture<Void> move(byte[] regionName) { 1770 CompletableFuture<Void> future = new CompletableFuture<>(); 1771 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1772 if (err != null) { 1773 future.completeExceptionally(err); 1774 return; 1775 } 1776 addListener( 1777 moveRegion(regionInfo, 1778 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1779 (ret, err2) -> { 1780 if (err2 != null) { 1781 future.completeExceptionally(err2); 1782 } else { 1783 future.complete(ret); 1784 } 1785 }); 1786 }); 1787 return future; 1788 } 1789 1790 @Override 1791 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1792 Preconditions.checkNotNull(destServerName, 1793 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1794 CompletableFuture<Void> future = new CompletableFuture<>(); 1795 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1796 if (err != null) { 1797 future.completeExceptionally(err); 1798 return; 1799 } 1800 addListener( 1801 moveRegion(regionInfo, RequestConverter 1802 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1803 (ret, err2) -> { 1804 if (err2 != null) { 1805 future.completeExceptionally(err2); 1806 } else { 1807 future.complete(ret); 1808 } 1809 }); 1810 }); 1811 return future; 1812 } 1813 1814 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1815 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1816 .action( 1817 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1818 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1819 .call(); 1820 } 1821 1822 @Override 1823 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1824 return this.<Void> newMasterCaller() 1825 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1826 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1827 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1828 .call(); 1829 } 1830 1831 @Override 1832 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1833 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1834 Scan scan = QuotaTableUtil.makeScan(filter); 1835 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1836 new AdvancedScanResultConsumer() { 1837 List<QuotaSettings> settings = new ArrayList<>(); 1838 1839 @Override 1840 public void onNext(Result[] results, ScanController controller) { 1841 for (Result result : results) { 1842 try { 1843 QuotaTableUtil.parseResultToCollection(result, settings); 1844 } catch (IOException e) { 1845 controller.terminate(); 1846 future.completeExceptionally(e); 1847 } 1848 } 1849 } 1850 1851 @Override 1852 public void onError(Throwable error) { 1853 future.completeExceptionally(error); 1854 } 1855 1856 @Override 1857 public void onComplete() { 1858 future.complete(settings); 1859 } 1860 }); 1861 return future; 1862 } 1863 1864 @Override 1865 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1866 boolean enabled) { 1867 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1868 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1869 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1870 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1871 } 1872 1873 @Override 1874 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1875 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1876 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1877 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1878 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1879 } 1880 1881 @Override 1882 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1883 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1884 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1885 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1886 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1887 } 1888 1889 @Override 1890 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1891 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1892 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1893 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1894 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1895 } 1896 1897 @Override 1898 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1899 return this.<ReplicationPeerConfig> newMasterCaller() 1900 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1901 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1902 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1903 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1904 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1905 .call(); 1906 } 1907 1908 @Override 1909 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1910 ReplicationPeerConfig peerConfig) { 1911 return this.<UpdateReplicationPeerConfigRequest, 1912 UpdateReplicationPeerConfigResponse> procedureCall( 1913 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1914 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1915 (resp) -> resp.getProcId(), 1916 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1917 } 1918 1919 @Override 1920 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1921 SyncReplicationState clusterState) { 1922 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1923 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1924 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1925 clusterState), 1926 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1927 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1928 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1929 } 1930 1931 @Override 1932 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1933 Map<TableName, List<String>> tableCfs) { 1934 if (tableCfs == null) { 1935 return failedFuture(new ReplicationException("tableCfs is null")); 1936 } 1937 1938 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1939 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1940 if (!completeExceptionally(future, error)) { 1941 ReplicationPeerConfig newPeerConfig = 1942 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1943 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1944 if (!completeExceptionally(future, error)) { 1945 future.complete(result); 1946 } 1947 }); 1948 } 1949 }); 1950 return future; 1951 } 1952 1953 @Override 1954 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1955 Map<TableName, List<String>> tableCfs) { 1956 if (tableCfs == null) { 1957 return failedFuture(new ReplicationException("tableCfs is null")); 1958 } 1959 1960 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1961 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1962 if (!completeExceptionally(future, error)) { 1963 ReplicationPeerConfig newPeerConfig = null; 1964 try { 1965 newPeerConfig = ReplicationPeerConfigUtil 1966 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1967 } catch (ReplicationException e) { 1968 future.completeExceptionally(e); 1969 return; 1970 } 1971 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1972 if (!completeExceptionally(future, error)) { 1973 future.complete(result); 1974 } 1975 }); 1976 } 1977 }); 1978 return future; 1979 } 1980 1981 @Override 1982 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1983 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1984 } 1985 1986 @Override 1987 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1988 Preconditions.checkNotNull(pattern, 1989 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1990 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1991 } 1992 1993 private CompletableFuture<List<ReplicationPeerDescription>> 1994 listReplicationPeers(ListReplicationPeersRequest request) { 1995 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1996 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1997 List<ReplicationPeerDescription>> call(controller, stub, request, 1998 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1999 (resp) -> resp.getPeerDescList().stream() 2000 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 2001 .collect(Collectors.toList()))) 2002 .call(); 2003 } 2004 2005 @Override 2006 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 2007 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 2008 addListener(listTableDescriptors(), (tables, error) -> { 2009 if (!completeExceptionally(future, error)) { 2010 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 2011 tables.forEach(table -> { 2012 Map<String, Integer> cfs = new HashMap<>(); 2013 Stream.of(table.getColumnFamilies()) 2014 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 2015 .forEach(column -> { 2016 cfs.put(column.getNameAsString(), column.getScope()); 2017 }); 2018 if (!cfs.isEmpty()) { 2019 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 2020 } 2021 }); 2022 future.complete(replicatedTableCFs); 2023 } 2024 }); 2025 return future; 2026 } 2027 2028 @Override 2029 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 2030 SnapshotProtos.SnapshotDescription snapshot = 2031 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2032 try { 2033 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2034 } catch (IllegalArgumentException e) { 2035 return failedFuture(e); 2036 } 2037 CompletableFuture<Void> future = new CompletableFuture<>(); 2038 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2039 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2040 addListener(this.<SnapshotResponse> newMasterCaller() 2041 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2042 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2043 .call(), (resp, err) -> { 2044 if (err != null) { 2045 future.completeExceptionally(err); 2046 return; 2047 } 2048 waitSnapshotFinish(snapshotDesc, future, resp); 2049 }); 2050 return future; 2051 } 2052 2053 // This is for keeping compatibility with old implementation. 2054 // If there is a procId field in the response, then the snapshot will be operated with a 2055 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2056 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2057 SnapshotResponse resp) { 2058 if (resp.hasProcId()) { 2059 getProcedureResult(resp.getProcId(), future, 0); 2060 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2061 } else { 2062 long expectedTimeout = resp.getExpectedTimeout(); 2063 TimerTask pollingTask = new TimerTask() { 2064 int tries = 0; 2065 long startTime = EnvironmentEdgeManager.currentTime(); 2066 long endTime = startTime + expectedTimeout; 2067 long maxPauseTime = expectedTimeout / maxAttempts; 2068 2069 @Override 2070 public void run(Timeout timeout) throws Exception { 2071 if (EnvironmentEdgeManager.currentTime() < endTime) { 2072 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2073 if (err2 != null) { 2074 future.completeExceptionally(err2); 2075 } else if (done) { 2076 future.complete(null); 2077 } else { 2078 // retry again after pauseTime. 2079 long pauseTime = 2080 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2081 pauseTime = Math.min(pauseTime, maxPauseTime); 2082 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2083 } 2084 }); 2085 } else { 2086 future 2087 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2088 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2089 } 2090 } 2091 }; 2092 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2093 } 2094 } 2095 2096 @Override 2097 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2098 return this.<Boolean> newMasterCaller() 2099 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2100 Boolean> call(controller, stub, 2101 IsSnapshotDoneRequest.newBuilder() 2102 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2103 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2104 .call(); 2105 } 2106 2107 @Override 2108 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2109 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2110 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2111 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2112 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2113 } 2114 2115 @Override 2116 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2117 boolean restoreAcl) { 2118 CompletableFuture<Void> future = new CompletableFuture<>(); 2119 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2120 if (err != null) { 2121 future.completeExceptionally(err); 2122 return; 2123 } 2124 TableName tableName = null; 2125 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2126 for (SnapshotDescription snap : snapshotDescriptions) { 2127 if (snap.getName().equals(snapshotName)) { 2128 tableName = snap.getTableName(); 2129 break; 2130 } 2131 } 2132 } 2133 if (tableName == null) { 2134 future.completeExceptionally( 2135 new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist.")); 2136 return; 2137 } 2138 final TableName finalTableName = tableName; 2139 addListener(tableExists(finalTableName), (exists, err2) -> { 2140 if (err2 != null) { 2141 future.completeExceptionally(err2); 2142 } else if (!exists) { 2143 // if table does not exist, then just clone snapshot into new table. 2144 completeConditionalOnFuture(future, 2145 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2146 } else { 2147 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2148 if (err4 != null) { 2149 future.completeExceptionally(err4); 2150 } else if (!disabled) { 2151 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2152 } else { 2153 completeConditionalOnFuture(future, 2154 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2155 } 2156 }); 2157 } 2158 }); 2159 }); 2160 return future; 2161 } 2162 2163 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2164 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2165 if (takeFailSafeSnapshot) { 2166 CompletableFuture<Void> future = new CompletableFuture<>(); 2167 // Step.1 Take a snapshot of the current state 2168 String failSafeSnapshotSnapshotNameFormat = 2169 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2170 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2171 final String failSafeSnapshotSnapshotName = 2172 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2173 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2174 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2175 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2176 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2177 if (err != null) { 2178 future.completeExceptionally(err); 2179 } else { 2180 // Step.2 Restore snapshot 2181 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2182 (void2, err2) -> { 2183 if (err2 != null) { 2184 // Step.3.a Something went wrong during the restore and try to rollback. 2185 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2186 restoreAcl, null), (void3, err3) -> { 2187 if (err3 != null) { 2188 future.completeExceptionally(err3); 2189 } else { 2190 // If fail to restore snapshot but rollback successfully, delete the 2191 // restore-failsafe snapshot. 2192 LOG.info( 2193 "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2194 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> { 2195 if (err4 != null) { 2196 LOG.error("Unable to remove the failsafe snapshot: {}", 2197 failSafeSnapshotSnapshotName, err4); 2198 } 2199 String msg = 2200 "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot=" 2201 + failSafeSnapshotSnapshotName + " succeeded."; 2202 LOG.error(msg); 2203 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2204 }); 2205 } 2206 }); 2207 } else { 2208 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2209 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2210 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2211 if (err3 != null) { 2212 LOG.error( 2213 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2214 err3); 2215 } 2216 future.complete(ret3); 2217 }); 2218 } 2219 }); 2220 } 2221 }); 2222 return future; 2223 } else { 2224 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2225 } 2226 } 2227 2228 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2229 CompletableFuture<T> parentFuture) { 2230 addListener(parentFuture, (res, err) -> { 2231 if (err != null) { 2232 dependentFuture.completeExceptionally(err); 2233 } else { 2234 dependentFuture.complete(res); 2235 } 2236 }); 2237 } 2238 2239 @Override 2240 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2241 boolean restoreAcl, String customSFT) { 2242 CompletableFuture<Void> future = new CompletableFuture<>(); 2243 addListener(tableExists(tableName), (exists, err) -> { 2244 if (err != null) { 2245 future.completeExceptionally(err); 2246 } else if (exists) { 2247 future.completeExceptionally(new TableExistsException(tableName)); 2248 } else { 2249 completeConditionalOnFuture(future, 2250 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2251 } 2252 }); 2253 return future; 2254 } 2255 2256 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2257 boolean restoreAcl, String customSFT) { 2258 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2259 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2260 try { 2261 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2262 } catch (IllegalArgumentException e) { 2263 return failedFuture(e); 2264 } 2265 RestoreSnapshotRequest.Builder builder = 2266 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2267 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2268 if (customSFT != null) { 2269 builder.setCustomSFT(customSFT); 2270 } 2271 return waitProcedureResult(this.<Long> newMasterCaller() 2272 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2273 Long> call(controller, stub, builder.build(), 2274 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2275 .call()); 2276 } 2277 2278 @Override 2279 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2280 return getCompletedSnapshots(null); 2281 } 2282 2283 @Override 2284 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2285 Preconditions.checkNotNull(pattern, 2286 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2287 return getCompletedSnapshots(pattern); 2288 } 2289 2290 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2291 return this.<List<SnapshotDescription>> newMasterCaller() 2292 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2293 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2294 GetCompletedSnapshotsRequest.newBuilder().build(), 2295 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2296 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2297 .call(); 2298 } 2299 2300 @Override 2301 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2302 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2303 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2304 return getCompletedSnapshots(tableNamePattern, null); 2305 } 2306 2307 @Override 2308 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2309 Pattern snapshotNamePattern) { 2310 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2311 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2312 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2313 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2314 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2315 } 2316 2317 private CompletableFuture<List<SnapshotDescription>> 2318 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2319 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2320 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2321 if (err != null) { 2322 future.completeExceptionally(err); 2323 return; 2324 } 2325 if (tableNames == null || tableNames.size() <= 0) { 2326 future.complete(Collections.emptyList()); 2327 return; 2328 } 2329 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2330 if (err2 != null) { 2331 future.completeExceptionally(err2); 2332 return; 2333 } 2334 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2335 future.complete(Collections.emptyList()); 2336 return; 2337 } 2338 future.complete(snapshotDescList.stream() 2339 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2340 .collect(Collectors.toList())); 2341 }); 2342 }); 2343 return future; 2344 } 2345 2346 @Override 2347 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2348 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2349 } 2350 2351 @Override 2352 public CompletableFuture<Void> deleteSnapshots() { 2353 return internalDeleteSnapshots(null, null); 2354 } 2355 2356 @Override 2357 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2358 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2359 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2360 return internalDeleteSnapshots(null, snapshotNamePattern); 2361 } 2362 2363 @Override 2364 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2365 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2366 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2367 return internalDeleteSnapshots(tableNamePattern, null); 2368 } 2369 2370 @Override 2371 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2372 Pattern snapshotNamePattern) { 2373 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2374 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2375 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2376 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2377 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2378 } 2379 2380 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2381 Pattern snapshotNamePattern) { 2382 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2383 if (tableNamePattern == null) { 2384 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2385 } else { 2386 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2387 } 2388 CompletableFuture<Void> future = new CompletableFuture<>(); 2389 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2390 if (err != null) { 2391 future.completeExceptionally(err); 2392 return; 2393 } 2394 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2395 future.complete(null); 2396 return; 2397 } 2398 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2399 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2400 if (e != null) { 2401 future.completeExceptionally(e); 2402 } else { 2403 future.complete(v); 2404 } 2405 }); 2406 }); 2407 return future; 2408 } 2409 2410 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2411 return this.<Void> newMasterCaller() 2412 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2413 controller, stub, 2414 DeleteSnapshotRequest.newBuilder() 2415 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2416 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2417 .call(); 2418 } 2419 2420 @Override 2421 public CompletableFuture<Void> execProcedure(String signature, String instance, 2422 Map<String, String> props) { 2423 CompletableFuture<Void> future = new CompletableFuture<>(); 2424 ProcedureDescription procDesc = 2425 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2426 addListener(this.<Long> newMasterCaller() 2427 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2428 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2429 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2430 .call(), (expectedTimeout, err) -> { 2431 if (err != null) { 2432 future.completeExceptionally(err); 2433 return; 2434 } 2435 TimerTask pollingTask = new TimerTask() { 2436 int tries = 0; 2437 long startTime = EnvironmentEdgeManager.currentTime(); 2438 long endTime = startTime + expectedTimeout; 2439 long maxPauseTime = expectedTimeout / maxAttempts; 2440 2441 @Override 2442 public void run(Timeout timeout) throws Exception { 2443 if (EnvironmentEdgeManager.currentTime() < endTime) { 2444 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2445 if (err2 != null) { 2446 future.completeExceptionally(err2); 2447 return; 2448 } 2449 if (done) { 2450 future.complete(null); 2451 } else { 2452 // retry again after pauseTime. 2453 long pauseTime = 2454 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2455 pauseTime = Math.min(pauseTime, maxPauseTime); 2456 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2457 TimeUnit.MICROSECONDS); 2458 } 2459 }); 2460 } else { 2461 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2462 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2463 } 2464 } 2465 }; 2466 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2467 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2468 }); 2469 return future; 2470 } 2471 2472 @Override 2473 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2474 Map<String, String> props) { 2475 ProcedureDescription proDesc = 2476 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2477 return this.<byte[]> newMasterCaller() 2478 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2479 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2480 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2481 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2482 .call(); 2483 } 2484 2485 @Override 2486 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2487 Map<String, String> props) { 2488 ProcedureDescription proDesc = 2489 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2490 return this.<Boolean> newMasterCaller() 2491 .action( 2492 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2493 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2494 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2495 .call(); 2496 } 2497 2498 @Override 2499 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2500 return this.<Boolean> newMasterCaller().action( 2501 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2502 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2503 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2504 .call(); 2505 } 2506 2507 @Override 2508 public CompletableFuture<String> getProcedures() { 2509 return this.<String> newMasterCaller() 2510 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2511 controller, stub, GetProceduresRequest.newBuilder().build(), 2512 (s, c, req, done) -> s.getProcedures(c, req, done), 2513 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2514 .call(); 2515 } 2516 2517 @Override 2518 public CompletableFuture<String> getLocks() { 2519 return this.<String> newMasterCaller() 2520 .action( 2521 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2522 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2523 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2524 .call(); 2525 } 2526 2527 @Override 2528 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2529 boolean offload) { 2530 return this.<Void> newMasterCaller() 2531 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2532 DecommissionRegionServersResponse, Void> call(controller, stub, 2533 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2534 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2535 .call(); 2536 } 2537 2538 @Override 2539 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2540 return this.<List<ServerName>> newMasterCaller() 2541 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2542 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2543 ListDecommissionedRegionServersRequest.newBuilder().build(), 2544 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2545 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2546 .collect(Collectors.toList()))) 2547 .call(); 2548 } 2549 2550 @Override 2551 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2552 List<byte[]> encodedRegionNames) { 2553 return this.<Void> newMasterCaller() 2554 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2555 RecommissionRegionServerResponse, Void> call(controller, stub, 2556 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2557 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2558 .call(); 2559 } 2560 2561 /** 2562 * Get the region location for the passed region name. The region name may be a full region name 2563 * or encoded region name. If the region does not found, then it'll throw an 2564 * UnknownRegionException wrapped by a {@link CompletableFuture} 2565 * @param regionNameOrEncodedRegionName region name or encoded region name 2566 * @return region location, wrapped by a {@link CompletableFuture} 2567 */ 2568 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2569 if (regionNameOrEncodedRegionName == null) { 2570 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2571 } 2572 2573 CompletableFuture<Optional<HRegionLocation>> future; 2574 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2575 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2576 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2577 // old format encodedName, should be meta region 2578 future = connection.registry.getMetaRegionLocations() 2579 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2580 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2581 } else { 2582 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2583 regionNameOrEncodedRegionName); 2584 } 2585 } else { 2586 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2587 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2588 RegionInfo regionInfo; 2589 try { 2590 regionInfo = 2591 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2592 } catch (IOException ioe) { 2593 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2594 } 2595 2596 if (regionInfo.isMetaRegion()) { 2597 future = connection.registry.getMetaRegionLocations() 2598 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2599 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2600 .findFirst()); 2601 } else { 2602 future = 2603 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2604 } 2605 } 2606 2607 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2608 addListener(future, (location, err) -> { 2609 if (err != null) { 2610 returnedFuture.completeExceptionally(err); 2611 return; 2612 } 2613 if (!location.isPresent() || location.get().getRegion() == null) { 2614 returnedFuture.completeExceptionally( 2615 new UnknownRegionException("Invalid region name or encoded region name: " 2616 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2617 } else { 2618 returnedFuture.complete(location.get()); 2619 } 2620 }); 2621 return returnedFuture; 2622 } 2623 2624 /** 2625 * Get the region info for the passed region name. The region name may be a full region name or 2626 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2627 * wrapped by a {@link CompletableFuture} 2628 * @return region info, wrapped by a {@link CompletableFuture} 2629 */ 2630 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2631 if (regionNameOrEncodedRegionName == null) { 2632 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2633 } 2634 2635 if ( 2636 Bytes.equals(regionNameOrEncodedRegionName, 2637 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2638 || Bytes.equals(regionNameOrEncodedRegionName, 2639 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2640 ) { 2641 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2642 } 2643 2644 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2645 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2646 if (err != null) { 2647 future.completeExceptionally(err); 2648 } else { 2649 future.complete(location.getRegion()); 2650 } 2651 }); 2652 return future; 2653 } 2654 2655 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2656 if (numRegions < 3) { 2657 throw new IllegalArgumentException("Must create at least three regions"); 2658 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2659 throw new IllegalArgumentException("Start key must be smaller than end key"); 2660 } 2661 if (numRegions == 3) { 2662 return new byte[][] { startKey, endKey }; 2663 } 2664 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2665 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2666 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2667 } 2668 return splitKeys; 2669 } 2670 2671 private void verifySplitKeys(byte[][] splitKeys) { 2672 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2673 // Verify there are no duplicate split keys 2674 byte[] lastKey = null; 2675 for (byte[] splitKey : splitKeys) { 2676 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2677 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2678 } 2679 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2680 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2681 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2682 } 2683 lastKey = splitKey; 2684 } 2685 } 2686 2687 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2688 2689 abstract void onFinished(); 2690 2691 abstract void onError(Throwable error); 2692 2693 @Override 2694 public void accept(Void v, Throwable error) { 2695 if (error != null) { 2696 onError(error); 2697 return; 2698 } 2699 onFinished(); 2700 } 2701 } 2702 2703 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2704 protected final TableName tableName; 2705 2706 TableProcedureBiConsumer(TableName tableName) { 2707 this.tableName = tableName; 2708 } 2709 2710 abstract String getOperationType(); 2711 2712 String getDescription() { 2713 return "Operation: " + getOperationType() + ", " + "Table Name: " 2714 + tableName.getNameWithNamespaceInclAsString(); 2715 } 2716 2717 @Override 2718 void onFinished() { 2719 LOG.info(getDescription() + " completed"); 2720 } 2721 2722 @Override 2723 void onError(Throwable error) { 2724 LOG.info(getDescription() + " failed with " + error.getMessage()); 2725 } 2726 } 2727 2728 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2729 protected final String namespaceName; 2730 2731 NamespaceProcedureBiConsumer(String namespaceName) { 2732 this.namespaceName = namespaceName; 2733 } 2734 2735 abstract String getOperationType(); 2736 2737 String getDescription() { 2738 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2739 } 2740 2741 @Override 2742 void onFinished() { 2743 LOG.info(getDescription() + " completed"); 2744 } 2745 2746 @Override 2747 void onError(Throwable error) { 2748 LOG.info(getDescription() + " failed with " + error.getMessage()); 2749 } 2750 } 2751 2752 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2753 2754 CreateTableProcedureBiConsumer(TableName tableName) { 2755 super(tableName); 2756 } 2757 2758 @Override 2759 String getOperationType() { 2760 return "CREATE"; 2761 } 2762 } 2763 2764 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2765 2766 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2767 super(tableName); 2768 } 2769 2770 @Override 2771 String getOperationType() { 2772 return "MODIFY"; 2773 } 2774 } 2775 2776 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2777 extends TableProcedureBiConsumer { 2778 2779 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2780 super(tableName); 2781 } 2782 2783 @Override 2784 String getOperationType() { 2785 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2786 } 2787 } 2788 2789 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2790 2791 DeleteTableProcedureBiConsumer(TableName tableName) { 2792 super(tableName); 2793 } 2794 2795 @Override 2796 String getOperationType() { 2797 return "DELETE"; 2798 } 2799 2800 @Override 2801 void onFinished() { 2802 connection.getLocator().clearCache(this.tableName); 2803 super.onFinished(); 2804 } 2805 } 2806 2807 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2808 2809 TruncateTableProcedureBiConsumer(TableName tableName) { 2810 super(tableName); 2811 } 2812 2813 @Override 2814 String getOperationType() { 2815 return "TRUNCATE"; 2816 } 2817 } 2818 2819 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2820 2821 EnableTableProcedureBiConsumer(TableName tableName) { 2822 super(tableName); 2823 } 2824 2825 @Override 2826 String getOperationType() { 2827 return "ENABLE"; 2828 } 2829 } 2830 2831 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2832 2833 DisableTableProcedureBiConsumer(TableName tableName) { 2834 super(tableName); 2835 } 2836 2837 @Override 2838 String getOperationType() { 2839 return "DISABLE"; 2840 } 2841 } 2842 2843 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2844 2845 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2846 super(tableName); 2847 } 2848 2849 @Override 2850 String getOperationType() { 2851 return "ADD_COLUMN_FAMILY"; 2852 } 2853 } 2854 2855 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2856 2857 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2858 super(tableName); 2859 } 2860 2861 @Override 2862 String getOperationType() { 2863 return "DELETE_COLUMN_FAMILY"; 2864 } 2865 } 2866 2867 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2868 2869 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2870 super(tableName); 2871 } 2872 2873 @Override 2874 String getOperationType() { 2875 return "MODIFY_COLUMN_FAMILY"; 2876 } 2877 } 2878 2879 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2880 extends TableProcedureBiConsumer { 2881 2882 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2883 super(tableName); 2884 } 2885 2886 @Override 2887 String getOperationType() { 2888 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2889 } 2890 } 2891 2892 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2893 2894 FlushTableProcedureBiConsumer(TableName tableName) { 2895 super(tableName); 2896 } 2897 2898 @Override 2899 String getOperationType() { 2900 return "FLUSH"; 2901 } 2902 } 2903 2904 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2905 2906 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2907 super(namespaceName); 2908 } 2909 2910 @Override 2911 String getOperationType() { 2912 return "CREATE_NAMESPACE"; 2913 } 2914 } 2915 2916 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2917 2918 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2919 super(namespaceName); 2920 } 2921 2922 @Override 2923 String getOperationType() { 2924 return "DELETE_NAMESPACE"; 2925 } 2926 } 2927 2928 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2929 2930 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2931 super(namespaceName); 2932 } 2933 2934 @Override 2935 String getOperationType() { 2936 return "MODIFY_NAMESPACE"; 2937 } 2938 } 2939 2940 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2941 2942 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2943 super(tableName); 2944 } 2945 2946 @Override 2947 String getOperationType() { 2948 return "MERGE_REGIONS"; 2949 } 2950 } 2951 2952 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2953 2954 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2955 super(tableName); 2956 } 2957 2958 @Override 2959 String getOperationType() { 2960 return "SPLIT_REGION"; 2961 } 2962 } 2963 2964 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2965 2966 TruncateRegionProcedureBiConsumer(TableName tableName) { 2967 super(tableName); 2968 } 2969 2970 @Override 2971 String getOperationType() { 2972 return "TRUNCATE_REGION"; 2973 } 2974 } 2975 2976 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2977 SnapshotProcedureBiConsumer(TableName tableName) { 2978 super(tableName); 2979 } 2980 2981 @Override 2982 String getOperationType() { 2983 return "SNAPSHOT"; 2984 } 2985 } 2986 2987 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2988 private final String peerId; 2989 private final Supplier<String> getOperation; 2990 2991 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2992 this.peerId = peerId; 2993 this.getOperation = getOperation; 2994 } 2995 2996 String getDescription() { 2997 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2998 } 2999 3000 @Override 3001 void onFinished() { 3002 LOG.info(getDescription() + " completed"); 3003 } 3004 3005 @Override 3006 void onError(Throwable error) { 3007 LOG.info(getDescription() + " failed with " + error.getMessage()); 3008 } 3009 } 3010 3011 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 3012 CompletableFuture<Void> future = new CompletableFuture<>(); 3013 addListener(procFuture, (procId, error) -> { 3014 if (error != null) { 3015 future.completeExceptionally(error); 3016 return; 3017 } 3018 getProcedureResult(procId, future, 0); 3019 }); 3020 return future; 3021 } 3022 3023 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 3024 addListener( 3025 this.<GetProcedureResultResponse> newMasterCaller() 3026 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 3027 GetProcedureResultResponse> call(controller, stub, 3028 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 3029 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 3030 .call(), 3031 (response, error) -> { 3032 if (error != null) { 3033 LOG.warn("failed to get the procedure result procId={}", procId, 3034 ConnectionUtils.translateException(error)); 3035 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 3036 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3037 return; 3038 } 3039 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 3040 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 3041 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3042 return; 3043 } 3044 if (response.hasException()) { 3045 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 3046 future.completeExceptionally(ioe); 3047 } else { 3048 future.complete(null); 3049 } 3050 }); 3051 } 3052 3053 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3054 CompletableFuture<T> future = new CompletableFuture<>(); 3055 future.completeExceptionally(error); 3056 return future; 3057 } 3058 3059 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3060 if (error != null) { 3061 future.completeExceptionally(error); 3062 return true; 3063 } 3064 return false; 3065 } 3066 3067 @Override 3068 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3069 return getClusterMetrics(EnumSet.allOf(Option.class)); 3070 } 3071 3072 @Override 3073 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3074 return this.<ClusterMetrics> newMasterCaller() 3075 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3076 ClusterMetrics> call(controller, stub, 3077 RequestConverter.buildGetClusterStatusRequest(options), 3078 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3079 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3080 .call(); 3081 } 3082 3083 @Override 3084 public CompletableFuture<Void> shutdown() { 3085 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3086 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3087 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3088 resp -> null)) 3089 .call(); 3090 } 3091 3092 @Override 3093 public CompletableFuture<Void> stopMaster() { 3094 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3095 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3096 controller, stub, StopMasterRequest.newBuilder().build(), 3097 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3098 .call(); 3099 } 3100 3101 @Override 3102 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3103 StopServerRequest request = RequestConverter 3104 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3105 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3106 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3107 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3108 resp -> null)) 3109 .serverName(serverName).call(); 3110 } 3111 3112 @Override 3113 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3114 return this.<Void> newAdminCaller() 3115 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3116 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3117 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3118 .serverName(serverName).call(); 3119 } 3120 3121 @Override 3122 public CompletableFuture<Void> updateConfiguration() { 3123 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3124 addListener( 3125 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3126 (status, err) -> { 3127 if (err != null) { 3128 future.completeExceptionally(err); 3129 } else { 3130 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3131 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3132 futures.add(updateConfiguration(status.getMasterName())); 3133 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3134 addListener( 3135 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3136 (result, err2) -> { 3137 if (err2 != null) { 3138 future.completeExceptionally(err2); 3139 } else { 3140 future.complete(result); 3141 } 3142 }); 3143 } 3144 }); 3145 return future; 3146 } 3147 3148 @Override 3149 public CompletableFuture<Void> updateConfiguration(String groupName) { 3150 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3151 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3152 if (err != null) { 3153 future.completeExceptionally(err); 3154 } else if (rsGroupInfo == null) { 3155 future.completeExceptionally( 3156 new IllegalArgumentException("Group does not exist: " + groupName)); 3157 } else { 3158 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3159 if (err2 != null) { 3160 future.completeExceptionally(err2); 3161 } else { 3162 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3163 List<ServerName> groupServers = status.getServersName().stream() 3164 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3165 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3166 addListener( 3167 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3168 (result, err3) -> { 3169 if (err3 != null) { 3170 future.completeExceptionally(err3); 3171 } else { 3172 future.complete(result); 3173 } 3174 }); 3175 } 3176 }); 3177 } 3178 }); 3179 return future; 3180 } 3181 3182 @Override 3183 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3184 return this.<Void> newAdminCaller() 3185 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3186 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3187 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3188 .serverName(serverName).call(); 3189 } 3190 3191 @Override 3192 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3193 return this.<Void> newAdminCaller() 3194 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3195 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3196 RequestConverter.buildClearCompactionQueuesRequest(queues), 3197 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3198 .serverName(serverName).call(); 3199 } 3200 3201 @Override 3202 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3203 return this.<List<SecurityCapability>> newMasterCaller() 3204 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3205 List<SecurityCapability>> call(controller, stub, 3206 SecurityCapabilitiesRequest.newBuilder().build(), 3207 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3208 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3209 .call(); 3210 } 3211 3212 @Override 3213 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3214 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3215 } 3216 3217 @Override 3218 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3219 TableName tableName) { 3220 Preconditions.checkNotNull(tableName, 3221 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3222 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3223 } 3224 3225 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3226 ServerName serverName) { 3227 return this.<List<RegionMetrics>> newAdminCaller() 3228 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3229 List<RegionMetrics>> adminCall(controller, stub, request, 3230 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3231 RegionMetricsBuilder::toRegionMetrics)) 3232 .serverName(serverName).call(); 3233 } 3234 3235 @Override 3236 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3237 return this.<Boolean> newMasterCaller() 3238 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3239 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3240 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3241 resp -> resp.getInMaintenanceMode())) 3242 .call(); 3243 } 3244 3245 @Override 3246 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3247 CompactType compactType) { 3248 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3249 3250 switch (compactType) { 3251 case MOB: 3252 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3253 if (err != null) { 3254 future.completeExceptionally(err); 3255 return; 3256 } 3257 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3258 3259 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3260 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3261 GetRegionInfoResponse> adminCall(controller, stub, 3262 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3263 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3264 .call(), (resp2, err2) -> { 3265 if (err2 != null) { 3266 future.completeExceptionally(err2); 3267 } else { 3268 if (resp2.hasCompactionState()) { 3269 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3270 } else { 3271 future.complete(CompactionState.NONE); 3272 } 3273 } 3274 }); 3275 }); 3276 break; 3277 case NORMAL: 3278 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3279 if (err != null) { 3280 future.completeExceptionally(err); 3281 return; 3282 } 3283 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3284 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3285 locations.stream().filter(loc -> loc.getServerName() != null) 3286 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3287 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3288 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3289 // If any region compaction state is MAJOR_AND_MINOR 3290 // the table compaction state is MAJOR_AND_MINOR, too. 3291 if (err2 != null) { 3292 future.completeExceptionally(unwrapCompletionException(err2)); 3293 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3294 future.complete(regionState); 3295 } else { 3296 regionStates.add(regionState); 3297 } 3298 })); 3299 }); 3300 addListener( 3301 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3302 (ret, err3) -> { 3303 // If future not completed, check all regions's compaction state 3304 if (!future.isCompletedExceptionally() && !future.isDone()) { 3305 CompactionState state = CompactionState.NONE; 3306 for (CompactionState regionState : regionStates) { 3307 switch (regionState) { 3308 case MAJOR: 3309 if (state == CompactionState.MINOR) { 3310 future.complete(CompactionState.MAJOR_AND_MINOR); 3311 } else { 3312 state = CompactionState.MAJOR; 3313 } 3314 break; 3315 case MINOR: 3316 if (state == CompactionState.MAJOR) { 3317 future.complete(CompactionState.MAJOR_AND_MINOR); 3318 } else { 3319 state = CompactionState.MINOR; 3320 } 3321 break; 3322 case NONE: 3323 default: 3324 } 3325 } 3326 if (!future.isDone()) { 3327 future.complete(state); 3328 } 3329 } 3330 }); 3331 }); 3332 break; 3333 default: 3334 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3335 } 3336 3337 return future; 3338 } 3339 3340 @Override 3341 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3342 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3343 addListener(getRegionLocation(regionName), (location, err) -> { 3344 if (err != null) { 3345 future.completeExceptionally(err); 3346 return; 3347 } 3348 ServerName serverName = location.getServerName(); 3349 if (serverName == null) { 3350 future 3351 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3352 return; 3353 } 3354 addListener( 3355 this.<GetRegionInfoResponse> newAdminCaller() 3356 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3357 GetRegionInfoResponse> adminCall(controller, stub, 3358 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3359 true), 3360 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3361 .serverName(serverName).call(), 3362 (resp2, err2) -> { 3363 if (err2 != null) { 3364 future.completeExceptionally(err2); 3365 } else { 3366 if (resp2.hasCompactionState()) { 3367 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3368 } else { 3369 future.complete(CompactionState.NONE); 3370 } 3371 } 3372 }); 3373 }); 3374 return future; 3375 } 3376 3377 @Override 3378 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3379 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3380 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3381 return this.<Optional<Long>> newMasterCaller() 3382 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3383 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3384 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3385 ProtobufUtil::toOptionalTimestamp)) 3386 .call(); 3387 } 3388 3389 @Override 3390 public CompletableFuture<Optional<Long>> 3391 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3392 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3393 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3394 addListener(getRegionInfo(regionName), (region, err) -> { 3395 if (err != null) { 3396 future.completeExceptionally(err); 3397 return; 3398 } 3399 MajorCompactionTimestampForRegionRequest.Builder builder = 3400 MajorCompactionTimestampForRegionRequest.newBuilder(); 3401 builder.setRegion( 3402 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3403 addListener(this.<Optional<Long>> newMasterCaller() 3404 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3405 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3406 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3407 ProtobufUtil::toOptionalTimestamp)) 3408 .call(), (timestamp, err2) -> { 3409 if (err2 != null) { 3410 future.completeExceptionally(err2); 3411 } else { 3412 future.complete(timestamp); 3413 } 3414 }); 3415 }); 3416 return future; 3417 } 3418 3419 @Override 3420 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3421 List<String> serverNamesList) { 3422 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3423 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3424 if (err != null) { 3425 future.completeExceptionally(err); 3426 return; 3427 } 3428 // Accessed by multiple threads. 3429 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3430 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3431 serverNames.stream().forEach(serverName -> { 3432 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3433 if (err2 != null) { 3434 future.completeExceptionally(unwrapCompletionException(err2)); 3435 } else { 3436 serverStates.put(serverName, serverState); 3437 } 3438 })); 3439 }); 3440 addListener( 3441 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3442 (ret, err3) -> { 3443 if (!future.isCompletedExceptionally()) { 3444 if (err3 != null) { 3445 future.completeExceptionally(err3); 3446 } else { 3447 future.complete(serverStates); 3448 } 3449 } 3450 }); 3451 }); 3452 return future; 3453 } 3454 3455 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3456 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3457 if (serverNamesList.isEmpty()) { 3458 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3459 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3460 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3461 if (err != null) { 3462 future.completeExceptionally(err); 3463 } else { 3464 future.complete(clusterMetrics.getServersName()); 3465 } 3466 }); 3467 return future; 3468 } else { 3469 List<ServerName> serverList = new ArrayList<>(); 3470 for (String regionServerName : serverNamesList) { 3471 ServerName serverName = null; 3472 try { 3473 serverName = ServerName.valueOf(regionServerName); 3474 } catch (Exception e) { 3475 future.completeExceptionally( 3476 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3477 } 3478 if (serverName == null) { 3479 future.completeExceptionally( 3480 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3481 } else { 3482 serverList.add(serverName); 3483 } 3484 } 3485 future.complete(serverList); 3486 } 3487 return future; 3488 } 3489 3490 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3491 return this.<Boolean> newAdminCaller().serverName(serverName) 3492 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3493 Boolean> adminCall(controller, stub, 3494 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3495 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3496 .call(); 3497 } 3498 3499 @Override 3500 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3501 return this.<Boolean> newMasterCaller() 3502 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3503 Boolean> call(controller, stub, 3504 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3505 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3506 (resp) -> resp.getPrevBalanceValue())) 3507 .call(); 3508 } 3509 3510 @Override 3511 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3512 return this.<BalanceResponse> newMasterCaller() 3513 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3514 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3515 (s, c, req, done) -> s.balance(c, req, done), 3516 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3517 .call(); 3518 } 3519 3520 @Override 3521 public CompletableFuture<Boolean> isBalancerEnabled() { 3522 return this.<Boolean> newMasterCaller() 3523 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3524 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3525 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3526 .call(); 3527 } 3528 3529 @Override 3530 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3531 return this.<Boolean> newMasterCaller() 3532 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3533 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3534 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3535 (resp) -> resp.getPrevNormalizerValue())) 3536 .call(); 3537 } 3538 3539 @Override 3540 public CompletableFuture<Boolean> isNormalizerEnabled() { 3541 return this.<Boolean> newMasterCaller() 3542 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3543 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3544 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3545 .call(); 3546 } 3547 3548 @Override 3549 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3550 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3551 } 3552 3553 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3554 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3555 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3556 } 3557 3558 @Override 3559 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3560 return this.<Boolean> newMasterCaller() 3561 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3562 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3563 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3564 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3565 (resp) -> resp.getPrevValue())) 3566 .call(); 3567 } 3568 3569 @Override 3570 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3571 return this.<Boolean> newMasterCaller() 3572 .action( 3573 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3574 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3575 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3576 .call(); 3577 } 3578 3579 @Override 3580 public CompletableFuture<Boolean> runCleanerChore() { 3581 return this.<Boolean> newMasterCaller() 3582 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3583 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3584 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3585 (resp) -> resp.getCleanerChoreRan())) 3586 .call(); 3587 } 3588 3589 @Override 3590 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3591 return this.<Boolean> newMasterCaller() 3592 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3593 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3594 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3595 .call(); 3596 } 3597 3598 @Override 3599 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3600 return this.<Boolean> newMasterCaller() 3601 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3602 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3603 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3604 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3605 .call(); 3606 } 3607 3608 @Override 3609 public CompletableFuture<Integer> runCatalogJanitor() { 3610 return this.<Integer> newMasterCaller() 3611 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3612 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3613 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3614 .call(); 3615 } 3616 3617 @Override 3618 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3619 ServiceCaller<S, R> callable) { 3620 MasterCoprocessorRpcChannelImpl channel = 3621 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3622 S stub = stubMaker.apply(channel); 3623 CompletableFuture<R> future = new CompletableFuture<>(); 3624 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3625 callable.call(stub, controller, resp -> { 3626 if (controller.failed()) { 3627 future.completeExceptionally(controller.getFailed()); 3628 } else { 3629 future.complete(resp); 3630 } 3631 }); 3632 return future; 3633 } 3634 3635 @Override 3636 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3637 ServiceCaller<S, R> callable, ServerName serverName) { 3638 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3639 this.<Message> newServerCaller().serverName(serverName)); 3640 S stub = stubMaker.apply(channel); 3641 CompletableFuture<R> future = new CompletableFuture<>(); 3642 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3643 callable.call(stub, controller, resp -> { 3644 if (controller.failed()) { 3645 future.completeExceptionally(controller.getFailed()); 3646 } else { 3647 future.complete(resp); 3648 } 3649 }); 3650 return future; 3651 } 3652 3653 @Override 3654 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3655 return this.<List<ServerName>> newMasterCaller() 3656 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3657 List<ServerName>> call(controller, stub, 3658 RequestConverter.buildClearDeadServersRequest(servers), 3659 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3660 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3661 .call(); 3662 } 3663 3664 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3665 return this.connection.callerFactory.<T> serverRequest() 3666 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3667 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3668 .pause(pauseNs, TimeUnit.NANOSECONDS) 3669 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3670 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3671 } 3672 3673 @Override 3674 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3675 if (tableName == null) { 3676 return failedFuture(new IllegalArgumentException("Table name is null")); 3677 } 3678 CompletableFuture<Void> future = new CompletableFuture<>(); 3679 addListener(tableExists(tableName), (exist, err) -> { 3680 if (err != null) { 3681 future.completeExceptionally(err); 3682 return; 3683 } 3684 if (!exist) { 3685 future.completeExceptionally(new TableNotFoundException( 3686 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3687 return; 3688 } 3689 addListener(getTableSplits(tableName), (splits, err1) -> { 3690 if (err1 != null) { 3691 future.completeExceptionally(err1); 3692 } else { 3693 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3694 if (err2 != null) { 3695 future.completeExceptionally(err2); 3696 } else { 3697 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3698 if (err3 != null) { 3699 future.completeExceptionally(err3); 3700 } else { 3701 future.complete(result3); 3702 } 3703 }); 3704 } 3705 }); 3706 } 3707 }); 3708 }); 3709 return future; 3710 } 3711 3712 @Override 3713 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3714 if (tableName == null) { 3715 return failedFuture(new IllegalArgumentException("Table name is null")); 3716 } 3717 CompletableFuture<Void> future = new CompletableFuture<>(); 3718 addListener(tableExists(tableName), (exist, err) -> { 3719 if (err != null) { 3720 future.completeExceptionally(err); 3721 return; 3722 } 3723 if (!exist) { 3724 future.completeExceptionally(new TableNotFoundException( 3725 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3726 return; 3727 } 3728 addListener(setTableReplication(tableName, false), (result, err2) -> { 3729 if (err2 != null) { 3730 future.completeExceptionally(err2); 3731 } else { 3732 future.complete(result); 3733 } 3734 }); 3735 }); 3736 return future; 3737 } 3738 3739 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3740 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3741 addListener( 3742 getRegions(tableName).thenApply(regions -> regions.stream() 3743 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3744 (regions, err2) -> { 3745 if (err2 != null) { 3746 future.completeExceptionally(err2); 3747 return; 3748 } 3749 if (regions.size() == 1) { 3750 future.complete(null); 3751 } else { 3752 byte[][] splits = new byte[regions.size() - 1][]; 3753 for (int i = 1; i < regions.size(); i++) { 3754 splits[i - 1] = regions.get(i).getStartKey(); 3755 } 3756 future.complete(splits); 3757 } 3758 }); 3759 return future; 3760 } 3761 3762 /** 3763 * Connect to peer and check the table descriptor on peer: 3764 * <ol> 3765 * <li>Create the same table on peer when not exist.</li> 3766 * <li>Throw an exception if the table already has replication enabled on any of the column 3767 * families.</li> 3768 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3769 * </ol> 3770 * @param tableName name of the table to sync to the peer 3771 * @param splits table split keys 3772 */ 3773 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3774 byte[][] splits) { 3775 CompletableFuture<Void> future = new CompletableFuture<>(); 3776 addListener(listReplicationPeers(), (peers, err) -> { 3777 if (err != null) { 3778 future.completeExceptionally(err); 3779 return; 3780 } 3781 if (peers == null || peers.size() <= 0) { 3782 future.completeExceptionally( 3783 new IllegalArgumentException("Found no peer cluster for replication.")); 3784 return; 3785 } 3786 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3787 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3788 .forEach(peer -> { 3789 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3790 }); 3791 addListener( 3792 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3793 (result, err2) -> { 3794 if (err2 != null) { 3795 future.completeExceptionally(err2); 3796 } else { 3797 future.complete(result); 3798 } 3799 }); 3800 }); 3801 return future; 3802 } 3803 3804 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3805 ReplicationPeerDescription peer) { 3806 Configuration peerConf; 3807 try { 3808 peerConf = ReplicationPeerConfigUtil 3809 .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig()); 3810 } catch (IOException e) { 3811 return failedFuture(e); 3812 } 3813 URI connectionUri = 3814 ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey()); 3815 CompletableFuture<Void> future = new CompletableFuture<>(); 3816 addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> { 3817 if (err != null) { 3818 future.completeExceptionally(err); 3819 return; 3820 } 3821 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3822 if (err1 != null) { 3823 future.completeExceptionally(err1); 3824 return; 3825 } 3826 AsyncAdmin peerAdmin = conn.getAdmin(); 3827 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3828 if (err2 != null) { 3829 future.completeExceptionally(err2); 3830 return; 3831 } 3832 if (!exist) { 3833 CompletableFuture<Void> createTableFuture = null; 3834 if (splits == null) { 3835 createTableFuture = peerAdmin.createTable(tableDesc); 3836 } else { 3837 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3838 } 3839 addListener(createTableFuture, (result, err3) -> { 3840 if (err3 != null) { 3841 future.completeExceptionally(err3); 3842 } else { 3843 future.complete(result); 3844 } 3845 }); 3846 } else { 3847 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3848 (result, err4) -> { 3849 if (err4 != null) { 3850 future.completeExceptionally(err4); 3851 } else { 3852 future.complete(result); 3853 } 3854 }); 3855 } 3856 }); 3857 }); 3858 }); 3859 return future; 3860 } 3861 3862 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3863 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3864 CompletableFuture<Void> future = new CompletableFuture<>(); 3865 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3866 if (err != null) { 3867 future.completeExceptionally(err); 3868 return; 3869 } 3870 if (peerTableDesc == null) { 3871 future.completeExceptionally( 3872 new IllegalArgumentException("Failed to get table descriptor for table " 3873 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3874 return; 3875 } 3876 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3877 future.completeExceptionally(new IllegalArgumentException( 3878 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3879 + ", but the table descriptors are not same when compared with source cluster." 3880 + " Thus can not enable the table's replication switch.")); 3881 return; 3882 } 3883 future.complete(null); 3884 }); 3885 return future; 3886 } 3887 3888 /** 3889 * Set the table's replication switch if the table's replication switch is already not set. 3890 * @param tableName name of the table 3891 * @param enableRep is replication switch enable or disable 3892 */ 3893 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3894 CompletableFuture<Void> future = new CompletableFuture<>(); 3895 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3896 if (err != null) { 3897 future.completeExceptionally(err); 3898 return; 3899 } 3900 if (!tableDesc.matchReplicationScope(enableRep)) { 3901 int scope = 3902 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3903 TableDescriptor newTableDesc = 3904 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3905 addListener(modifyTable(newTableDesc), (result, err2) -> { 3906 if (err2 != null) { 3907 future.completeExceptionally(err2); 3908 } else { 3909 future.complete(result); 3910 } 3911 }); 3912 } else { 3913 future.complete(null); 3914 } 3915 }); 3916 return future; 3917 } 3918 3919 @Override 3920 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 3921 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 3922 request.setPeerId(peerId); 3923 return this.<Boolean> newMasterCaller() 3924 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 3925 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 3926 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 3927 resp -> resp.getIsEnabled())) 3928 .call(); 3929 } 3930 3931 private void waitUntilAllReplicationPeerModificationProceduresDone( 3932 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 3933 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 3934 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 3935 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 3936 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 3937 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 3938 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 3939 resp -> resp.getProcedureList())) 3940 .call(); 3941 addListener(callFuture, (r, e) -> { 3942 if (e != null) { 3943 future.completeExceptionally(e); 3944 } else if (r.isEmpty()) { 3945 // we are done 3946 future.complete(prevOn); 3947 } else { 3948 // retry later to see if the procedures are done 3949 retryTimer.newTimeout( 3950 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 3951 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3952 } 3953 }); 3954 } 3955 3956 @Override 3957 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 3958 boolean drainProcedures) { 3959 ReplicationPeerModificationSwitchRequest request = 3960 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 3961 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 3962 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 3963 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 3964 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 3965 resp -> resp.getPreviousValue())) 3966 .call(); 3967 // if we do not need to wait all previous peer modification procedure done, or we are enabling 3968 // peer modification, just return here. 3969 if (!drainProcedures || on) { 3970 return callFuture; 3971 } 3972 // otherwise we need to wait until all previous peer modification procedure done 3973 CompletableFuture<Boolean> future = new CompletableFuture<>(); 3974 addListener(callFuture, (prevOn, err) -> { 3975 if (err != null) { 3976 future.completeExceptionally(err); 3977 return; 3978 } 3979 // even if the previous state is disabled, we still need to wait here, as there could be 3980 // another client thread which called this method just before us and have already changed the 3981 // state to off, but there are still peer modification procedures not finished, so we should 3982 // also wait here. 3983 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 3984 }); 3985 return future; 3986 } 3987 3988 @Override 3989 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 3990 return this.<Boolean> newMasterCaller() 3991 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 3992 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 3993 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 3994 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 3995 (resp) -> resp.getEnabled())) 3996 .call(); 3997 } 3998 3999 @Override 4000 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 4001 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 4002 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 4003 if (err != null) { 4004 future.completeExceptionally(err); 4005 return; 4006 } 4007 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 4008 locations.stream().filter(l -> l.getRegion() != null) 4009 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 4010 .collect(Collectors.groupingBy(l -> l.getServerName(), 4011 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 4012 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 4013 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 4014 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 4015 futures 4016 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 4017 if (err2 != null) { 4018 future.completeExceptionally(unwrapCompletionException(err2)); 4019 } else { 4020 aggregator.append(stats); 4021 } 4022 })); 4023 } 4024 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 4025 (ret, err3) -> { 4026 if (err3 != null) { 4027 future.completeExceptionally(unwrapCompletionException(err3)); 4028 } else { 4029 future.complete(aggregator.sum()); 4030 } 4031 }); 4032 }); 4033 return future; 4034 } 4035 4036 @Override 4037 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 4038 boolean preserveSplits) { 4039 CompletableFuture<Void> future = new CompletableFuture<>(); 4040 addListener(tableExists(tableName), (exist, err) -> { 4041 if (err != null) { 4042 future.completeExceptionally(err); 4043 return; 4044 } 4045 if (!exist) { 4046 future.completeExceptionally(new TableNotFoundException(tableName)); 4047 return; 4048 } 4049 addListener(tableExists(newTableName), (exist1, err1) -> { 4050 if (err1 != null) { 4051 future.completeExceptionally(err1); 4052 return; 4053 } 4054 if (exist1) { 4055 future.completeExceptionally(new TableExistsException(newTableName)); 4056 return; 4057 } 4058 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 4059 if (err2 != null) { 4060 future.completeExceptionally(err2); 4061 return; 4062 } 4063 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 4064 if (preserveSplits) { 4065 addListener(getTableSplits(tableName), (splits, err3) -> { 4066 if (err3 != null) { 4067 future.completeExceptionally(err3); 4068 } else { 4069 addListener( 4070 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 4071 (result, err4) -> { 4072 if (err4 != null) { 4073 future.completeExceptionally(err4); 4074 } else { 4075 future.complete(result); 4076 } 4077 }); 4078 } 4079 }); 4080 } else { 4081 addListener(createTable(newTableDesc), (result, err5) -> { 4082 if (err5 != null) { 4083 future.completeExceptionally(err5); 4084 } else { 4085 future.complete(result); 4086 } 4087 }); 4088 } 4089 }); 4090 }); 4091 }); 4092 return future; 4093 } 4094 4095 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4096 List<RegionInfo> hris) { 4097 return this.<CacheEvictionStats> newAdminCaller() 4098 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4099 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4100 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4101 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4102 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4103 .serverName(serverName).call(); 4104 } 4105 4106 @Override 4107 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4108 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4109 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4110 Boolean> call(controller, stub, 4111 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4112 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4113 resp -> resp.getPreviousRpcThrottleEnabled())) 4114 .call(); 4115 return future; 4116 } 4117 4118 @Override 4119 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4120 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4121 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4122 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4123 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4124 resp -> resp.getRpcThrottleEnabled())) 4125 .call(); 4126 return future; 4127 } 4128 4129 @Override 4130 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4131 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4132 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4133 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4134 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4135 .build(), 4136 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4137 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4138 .call(); 4139 return future; 4140 } 4141 4142 @Override 4143 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4144 return this.<Map<TableName, Long>> newMasterCaller() 4145 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4146 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4147 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4148 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4149 resp -> resp.getSizesList().stream().collect(Collectors 4150 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4151 .call(); 4152 } 4153 4154 @Override 4155 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4156 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4157 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4158 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4159 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4160 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4161 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4162 resp -> resp.getSnapshotsList().stream() 4163 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4164 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4165 .serverName(serverName).call(); 4166 } 4167 4168 private CompletableFuture<SpaceQuotaSnapshot> 4169 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4170 return this.<SpaceQuotaSnapshot> newMasterCaller() 4171 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4172 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4173 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4174 .call(); 4175 } 4176 4177 @Override 4178 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4179 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4180 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4181 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4182 } 4183 4184 @Override 4185 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4186 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4187 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4188 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4189 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4190 } 4191 4192 @Override 4193 public CompletableFuture<Void> grant(UserPermission userPermission, 4194 boolean mergeExistingPermissions) { 4195 return this.<Void> newMasterCaller() 4196 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4197 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4198 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4199 .call(); 4200 } 4201 4202 @Override 4203 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4204 return this.<Void> newMasterCaller() 4205 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4206 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4207 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4208 .call(); 4209 } 4210 4211 @Override 4212 public CompletableFuture<List<UserPermission>> 4213 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4214 return this.<List<UserPermission>> newMasterCaller() 4215 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4216 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4217 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4218 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4219 resp -> resp.getUserPermissionList().stream() 4220 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4221 .collect(Collectors.toList()))) 4222 .call(); 4223 } 4224 4225 @Override 4226 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4227 List<Permission> permissions) { 4228 return this.<List<Boolean>> newMasterCaller() 4229 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4230 List<Boolean>> call(controller, stub, 4231 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4232 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4233 resp -> resp.getHasUserPermissionList())) 4234 .call(); 4235 } 4236 4237 @Override 4238 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4239 return this.<Boolean> newMasterCaller() 4240 .action((controller, stub) -> this.call(controller, stub, 4241 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4242 MasterService.Interface::switchSnapshotCleanup, 4243 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4244 .call(); 4245 } 4246 4247 @Override 4248 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4249 return this.<Boolean> newMasterCaller() 4250 .action((controller, stub) -> this.call(controller, stub, 4251 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4252 MasterService.Interface::isSnapshotCleanupEnabled, 4253 IsSnapshotCleanupEnabledResponse::getEnabled)) 4254 .call(); 4255 } 4256 4257 @Override 4258 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4259 return this.<Void> newMasterCaller() 4260 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4261 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4262 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4263 .call(); 4264 } 4265 4266 @Override 4267 public CompletableFuture<Void> addRSGroup(String groupName) { 4268 return this.<Void> newMasterCaller() 4269 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4270 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4271 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4272 .call(); 4273 } 4274 4275 @Override 4276 public CompletableFuture<Void> removeRSGroup(String groupName) { 4277 return this.<Void> newMasterCaller() 4278 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4279 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4280 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4281 .call(); 4282 } 4283 4284 @Override 4285 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4286 BalanceRequest request) { 4287 return this.<BalanceResponse> newMasterCaller() 4288 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4289 BalanceResponse> call(controller, stub, 4290 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4291 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4292 .call(); 4293 } 4294 4295 @Override 4296 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4297 return this.<List<RSGroupInfo>> newMasterCaller() 4298 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4299 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4300 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4301 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4302 .call(); 4303 } 4304 4305 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4306 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4307 final String logType) { 4308 if (CollectionUtils.isEmpty(serverNames)) { 4309 return CompletableFuture.completedFuture(Collections.emptyList()); 4310 } 4311 return CompletableFuture.supplyAsync(() -> serverNames 4312 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4313 filterParams, limit, logType)) 4314 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4315 } 4316 4317 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4318 Map<String, Object> filterParams, int limit, String logType) { 4319 return this.<List<LogEntry>> newAdminCaller() 4320 .action((controller, stub) -> this.adminCall(controller, stub, 4321 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4322 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4323 .serverName(serverName).call(); 4324 } 4325 4326 @Override 4327 public CompletableFuture<List<Boolean>> 4328 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4329 if (CollectionUtils.isEmpty(serverNames)) { 4330 return CompletableFuture.completedFuture(Collections.emptyList()); 4331 } 4332 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4333 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4334 return convertToFutureOfList(clearSlowLogResponseList); 4335 } 4336 4337 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4338 return this.<Boolean> newAdminCaller() 4339 .action((controller, stub) -> this.adminCall(controller, stub, 4340 RequestConverter.buildClearSlowLogResponseRequest(), 4341 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4342 .serverName(serverName).call(); 4343 } 4344 4345 private static <T> CompletableFuture<List<T>> 4346 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4347 CompletableFuture<Void> allDoneFuture = 4348 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4349 return allDoneFuture 4350 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4351 } 4352 4353 @Override 4354 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4355 return this.<List<TableName>> newMasterCaller() 4356 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4357 List<TableName>> call(controller, stub, 4358 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4359 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4360 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4361 .call(); 4362 } 4363 4364 @Override 4365 public CompletableFuture<Pair<List<String>, List<TableName>>> 4366 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4367 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4368 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4369 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4370 Pair<List<String>, List<TableName>>> call(controller, stub, 4371 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4372 .build(), 4373 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4374 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4375 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4376 .call(); 4377 } 4378 4379 @Override 4380 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4381 return this.<RSGroupInfo> newMasterCaller() 4382 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4383 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4384 GetRSGroupInfoOfServerRequest.newBuilder() 4385 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4386 .setPort(hostPort.getPort()).build()) 4387 .build(), 4388 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4389 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4390 .call(); 4391 } 4392 4393 @Override 4394 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4395 return this.<Void> newMasterCaller() 4396 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4397 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4398 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4399 .call(); 4400 } 4401 4402 @Override 4403 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4404 CompletableFuture<Void> future = new CompletableFuture<>(); 4405 for (TableName tableName : tables) { 4406 addListener(tableExists(tableName), (exist, err) -> { 4407 if (err != null) { 4408 future.completeExceptionally(err); 4409 return; 4410 } 4411 if (!exist) { 4412 future.completeExceptionally(new TableNotFoundException(tableName)); 4413 return; 4414 } 4415 }); 4416 } 4417 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4418 if (err != null) { 4419 future.completeExceptionally(err); 4420 return; 4421 } 4422 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4423 future.complete(null); 4424 return; 4425 } 4426 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4427 for (TableDescriptor td : tableDescriptions) { 4428 newTableDescriptors 4429 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4430 } 4431 addListener( 4432 CompletableFuture.allOf( 4433 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4434 (v, e) -> { 4435 if (e != null) { 4436 future.completeExceptionally(e); 4437 } else { 4438 future.complete(v); 4439 } 4440 }); 4441 }); 4442 return future; 4443 } 4444 4445 @Override 4446 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4447 return this.<RSGroupInfo> newMasterCaller() 4448 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4449 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4450 GetRSGroupInfoOfTableRequest.newBuilder() 4451 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4452 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4453 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4454 .call(); 4455 } 4456 4457 @Override 4458 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4459 return this.<RSGroupInfo> newMasterCaller() 4460 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4461 RSGroupInfo> call(controller, stub, 4462 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4463 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4464 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4465 .call(); 4466 } 4467 4468 @Override 4469 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4470 return this.<Void> newMasterCaller() 4471 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4472 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4473 .setNewRsgroupName(newName).build(), 4474 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4475 .call(); 4476 } 4477 4478 @Override 4479 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4480 Map<String, String> configuration) { 4481 UpdateRSGroupConfigRequest.Builder request = 4482 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4483 if (configuration != null) { 4484 configuration.entrySet().forEach(e -> request.addConfiguration( 4485 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4486 } 4487 return this.<Void> newMasterCaller() 4488 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4489 Void> call(controller, stub, request.build(), 4490 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4491 .call(); 4492 } 4493 4494 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4495 return this.<List<LogEntry>> newMasterCaller() 4496 .action((controller, stub) -> this.call(controller, stub, 4497 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4498 ProtobufUtil::toBalancerDecisionResponse)) 4499 .call(); 4500 } 4501 4502 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4503 return this.<List<LogEntry>> newMasterCaller() 4504 .action((controller, stub) -> this.call(controller, stub, 4505 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4506 ProtobufUtil::toBalancerRejectionResponse)) 4507 .call(); 4508 } 4509 4510 @Override 4511 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4512 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4513 if (logType == null || serverType == null) { 4514 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4515 } 4516 switch (logType) { 4517 case "SLOW_LOG": 4518 case "LARGE_LOG": 4519 if (ServerType.MASTER.equals(serverType)) { 4520 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4521 } 4522 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4523 case "BALANCER_DECISION": 4524 if (ServerType.REGION_SERVER.equals(serverType)) { 4525 throw new IllegalArgumentException( 4526 "Balancer Decision logs are not maintained by HRegionServer"); 4527 } 4528 return getBalancerDecisions(limit); 4529 case "BALANCER_REJECTION": 4530 if (ServerType.REGION_SERVER.equals(serverType)) { 4531 throw new IllegalArgumentException( 4532 "Balancer Rejection logs are not maintained by HRegionServer"); 4533 } 4534 return getBalancerRejections(limit); 4535 default: 4536 return CompletableFuture.completedFuture(Collections.emptyList()); 4537 } 4538 } 4539 4540 @Override 4541 public CompletableFuture<Void> flushMasterStore() { 4542 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4543 return this.<Void> newMasterCaller() 4544 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4545 Void> call(controller, stub, request.build(), 4546 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4547 .call(); 4548 } 4549 4550 @Override 4551 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4552 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4553 return this.<List<String>> newAdminCaller() 4554 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4555 List<String>> adminCall(controller, stub, request.build(), 4556 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4557 resp -> resp.getCachedFilesList())) 4558 .serverName(serverName).call(); 4559 } 4560}