001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.net.URI; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 052import org.apache.hadoop.hbase.CatalogFamilyFormat; 053import org.apache.hadoop.hbase.ClientMetaTableAccessor; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.NamespaceDescriptor; 061import org.apache.hadoop.hbase.RegionLocations; 062import org.apache.hadoop.hbase.RegionMetrics; 063import org.apache.hadoop.hbase.RegionMetricsBuilder; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableExistsException; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotDisabledException; 068import org.apache.hadoop.hbase.TableNotEnabledException; 069import org.apache.hadoop.hbase.TableNotFoundException; 070import org.apache.hadoop.hbase.UnknownRegionException; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.Scan.ReadType; 075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 076import org.apache.hadoop.hbase.client.replication.TableCFs; 077import org.apache.hadoop.hbase.client.security.SecurityCapability; 078import org.apache.hadoop.hbase.exceptions.DeserializationException; 079import org.apache.hadoop.hbase.ipc.HBaseRpcController; 080import org.apache.hadoop.hbase.net.Address; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 086import org.apache.hadoop.hbase.replication.ReplicationException; 087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 089import org.apache.hadoop.hbase.replication.SyncReplicationState; 090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 092import org.apache.hadoop.hbase.security.access.Permission; 093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 094import org.apache.hadoop.hbase.security.access.UserPermission; 095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.Strings; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 108import org.apache.hbase.thirdparty.com.google.protobuf.Message; 109import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 110import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 111import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 112import org.apache.hbase.thirdparty.io.netty.util.Timeout; 113import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 115 116import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 117import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 357import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 358 359/** 360 * The implementation of AsyncAdmin. 361 * <p> 362 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 363 * be finished inside the rpc framework thread, which means that the callbacks registered to the 364 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 365 * this class should not try to do time consuming tasks in the callbacks. 366 * @since 2.0.0 367 * @see AsyncHBaseAdmin 368 * @see AsyncConnection#getAdmin() 369 * @see AsyncConnection#getAdminBuilder() 370 */ 371@InterfaceAudience.Private 372class RawAsyncHBaseAdmin implements AsyncAdmin { 373 374 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 375 376 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 377 378 private final AsyncConnectionImpl connection; 379 380 private final HashedWheelTimer retryTimer; 381 382 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 383 384 private final long rpcTimeoutNs; 385 386 private final long operationTimeoutNs; 387 388 private final long pauseNs; 389 390 private final long pauseNsForServerOverloaded; 391 392 private final int maxAttempts; 393 394 private final int startLogErrorsCnt; 395 396 private final NonceGenerator ng; 397 398 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 399 AsyncAdminBuilderBase builder) { 400 this.connection = connection; 401 this.retryTimer = retryTimer; 402 this.metaTable = connection.getTable(META_TABLE_NAME); 403 this.rpcTimeoutNs = builder.rpcTimeoutNs; 404 this.operationTimeoutNs = builder.operationTimeoutNs; 405 this.pauseNs = builder.pauseNs; 406 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 407 LOG.warn( 408 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 409 + " the normal pause value {} ms, use the greater one instead", 410 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 411 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 412 this.pauseNsForServerOverloaded = builder.pauseNs; 413 } else { 414 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 415 } 416 this.maxAttempts = builder.maxAttempts; 417 this.startLogErrorsCnt = builder.startLogErrorsCnt; 418 this.ng = connection.getNonceGenerator(); 419 } 420 421 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 422 return this.connection.callerFactory.<T> masterRequest() 423 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 424 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 425 .pause(pauseNs, TimeUnit.NANOSECONDS) 426 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 427 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 428 } 429 430 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 431 return this.connection.callerFactory.<T> adminRequest() 432 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 433 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 434 .pause(pauseNs, TimeUnit.NANOSECONDS) 435 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 436 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 437 } 438 439 @FunctionalInterface 440 private interface MasterRpcCall<RESP, REQ> { 441 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 442 RpcCallback<RESP> done); 443 } 444 445 @FunctionalInterface 446 private interface AdminRpcCall<RESP, REQ> { 447 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 448 RpcCallback<RESP> done); 449 } 450 451 @FunctionalInterface 452 private interface Converter<D, S> { 453 D convert(S src) throws IOException; 454 } 455 456 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 457 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 458 Converter<RESP, PRESP> respConverter) { 459 CompletableFuture<RESP> future = new CompletableFuture<>(); 460 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 461 462 @Override 463 public void run(PRESP resp) { 464 if (controller.failed()) { 465 future.completeExceptionally(controller.getFailed()); 466 } else { 467 try { 468 future.complete(respConverter.convert(resp)); 469 } catch (IOException e) { 470 future.completeExceptionally(e); 471 } 472 } 473 } 474 }); 475 return future; 476 } 477 478 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 479 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 480 Converter<RESP, PRESP> respConverter) { 481 CompletableFuture<RESP> future = new CompletableFuture<>(); 482 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 483 484 @Override 485 public void run(PRESP resp) { 486 if (controller.failed()) { 487 future.completeExceptionally(controller.getFailed()); 488 } else { 489 try { 490 future.complete(respConverter.convert(resp)); 491 } catch (IOException e) { 492 future.completeExceptionally(e); 493 } 494 } 495 } 496 }); 497 return future; 498 } 499 500 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 501 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 502 ProcedureBiConsumer consumer) { 503 return procedureCall(b -> { 504 }, preq, rpcCall, respConverter, consumer); 505 } 506 507 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 508 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 509 ProcedureBiConsumer consumer) { 510 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 511 } 512 513 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 514 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 515 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 516 ProcedureBiConsumer consumer) { 517 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 518 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 519 prioritySetter.accept(builder); 520 CompletableFuture<Long> procFuture = builder.call(); 521 CompletableFuture<Void> future = waitProcedureResult(procFuture); 522 addListener(future, consumer); 523 return future; 524 } 525 526 @Override 527 public CompletableFuture<Boolean> tableExists(TableName tableName) { 528 if (TableName.isMetaTableName(tableName)) { 529 return CompletableFuture.completedFuture(true); 530 } 531 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 532 } 533 534 @Override 535 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 536 return getTableDescriptors( 537 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 538 } 539 540 /** 541 * {@link #listTableDescriptors(boolean)} 542 */ 543 @Override 544 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 545 boolean includeSysTables) { 546 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 547 + "use listTableDescriptors(boolean) instead"); 548 return getTableDescriptors( 549 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 550 } 551 552 @Override 553 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 554 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 555 + "use listTableDescriptors(boolean) instead"); 556 if (tableNames.isEmpty()) { 557 return CompletableFuture.completedFuture(Collections.emptyList()); 558 } 559 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 560 } 561 562 private CompletableFuture<List<TableDescriptor>> 563 getTableDescriptors(GetTableDescriptorsRequest request) { 564 return this.<List<TableDescriptor>> newMasterCaller() 565 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 566 List<TableDescriptor>> call(controller, stub, request, 567 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 568 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 569 .call(); 570 } 571 572 @Override 573 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 574 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 575 } 576 577 @Override 578 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 579 boolean includeSysTables) { 580 Preconditions.checkNotNull(pattern, 581 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 582 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 583 } 584 585 @Override 586 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 587 return this.<List<TableName>> newMasterCaller() 588 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 589 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 590 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 591 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 592 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 593 .call(); 594 } 595 596 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 597 return this.<List<TableName>> newMasterCaller() 598 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 599 List<TableName>> call(controller, stub, request, 600 (s, c, req, done) -> s.getTableNames(c, req, done), 601 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 602 .call(); 603 } 604 605 @Override 606 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 607 return this.<List<TableDescriptor>> newMasterCaller() 608 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 609 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 610 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 611 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 612 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 613 .call(); 614 } 615 616 @Override 617 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 618 return this.<List<TableDescriptor>> newMasterCaller() 619 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 620 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 621 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 622 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 623 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 624 .call(); 625 } 626 627 @Override 628 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 629 return this.<List<TableName>> newMasterCaller() 630 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 631 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 632 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 633 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 634 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 635 .call(); 636 } 637 638 @Override 639 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 640 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 641 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 642 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 643 List<TableSchema>> call(controller, stub, 644 RequestConverter.buildGetTableDescriptorsRequest(tableName), 645 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 646 (resp) -> resp.getTableSchemaList())) 647 .call(), (tableSchemas, error) -> { 648 if (error != null) { 649 future.completeExceptionally(error); 650 return; 651 } 652 if (!tableSchemas.isEmpty()) { 653 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 654 } else { 655 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 656 } 657 }); 658 return future; 659 } 660 661 @Override 662 public CompletableFuture<Void> createTable(TableDescriptor desc) { 663 return createTable(desc.getTableName(), 664 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 665 } 666 667 @Override 668 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 669 int numRegions) { 670 try { 671 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 672 } catch (IllegalArgumentException e) { 673 return failedFuture(e); 674 } 675 } 676 677 @Override 678 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 679 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 680 + " use createTable(TableDescriptor) instead"); 681 try { 682 verifySplitKeys(splitKeys); 683 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 684 splitKeys, ng.getNonceGroup(), ng.newNonce())); 685 } catch (IllegalArgumentException e) { 686 return failedFuture(e); 687 } 688 } 689 690 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 691 Preconditions.checkNotNull(tableName, "table name is null"); 692 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 693 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 694 new CreateTableProcedureBiConsumer(tableName)); 695 } 696 697 @Override 698 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 699 return modifyTable(desc, true); 700 } 701 702 @Override 703 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 704 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 705 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 706 ng.newNonce(), reopenRegions), 707 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 708 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 709 } 710 711 @Override 712 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 713 return this.<ModifyTableStoreFileTrackerRequest, 714 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 715 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 716 ng.getNonceGroup(), ng.newNonce()), 717 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 718 (resp) -> resp.getProcId(), 719 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 720 } 721 722 @Override 723 public CompletableFuture<Void> deleteTable(TableName tableName) { 724 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 725 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 726 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 727 new DeleteTableProcedureBiConsumer(tableName)); 728 } 729 730 @Override 731 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 732 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 733 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 734 ng.newNonce()), 735 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 736 new TruncateTableProcedureBiConsumer(tableName)); 737 } 738 739 @Override 740 public CompletableFuture<Void> enableTable(TableName tableName) { 741 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 742 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 743 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 744 new EnableTableProcedureBiConsumer(tableName)); 745 } 746 747 @Override 748 public CompletableFuture<Void> disableTable(TableName tableName) { 749 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 750 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 751 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 752 new DisableTableProcedureBiConsumer(tableName)); 753 } 754 755 /** 756 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 757 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 758 * targetState). 759 */ 760 private static CompletableFuture<Boolean> completeCheckTableState( 761 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 762 TableState.State targetState, TableName tableName) { 763 if (error != null) { 764 future.completeExceptionally(error); 765 } else { 766 if (tableState != null) { 767 future.complete(tableState.inStates(targetState)); 768 } else { 769 future.completeExceptionally(new TableNotFoundException(tableName)); 770 } 771 } 772 return future; 773 } 774 775 @Override 776 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 777 if (TableName.isMetaTableName(tableName)) { 778 return CompletableFuture.completedFuture(true); 779 } 780 CompletableFuture<Boolean> future = new CompletableFuture<>(); 781 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 782 (tableState, error) -> { 783 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 784 TableState.State.ENABLED, tableName); 785 }); 786 return future; 787 } 788 789 @Override 790 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 791 if (TableName.isMetaTableName(tableName)) { 792 return CompletableFuture.completedFuture(false); 793 } 794 CompletableFuture<Boolean> future = new CompletableFuture<>(); 795 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 796 (tableState, error) -> { 797 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 798 TableState.State.DISABLED, tableName); 799 }); 800 return future; 801 } 802 803 @Override 804 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 805 if (TableName.isMetaTableName(tableName)) { 806 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 807 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 808 } 809 CompletableFuture<Boolean> future = new CompletableFuture<>(); 810 addListener(isTableEnabled(tableName), (enabled, error) -> { 811 if (error != null) { 812 if (error instanceof TableNotFoundException) { 813 future.complete(false); 814 } else { 815 future.completeExceptionally(error); 816 } 817 return; 818 } 819 if (!enabled) { 820 future.complete(false); 821 } else { 822 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 823 (locations, error1) -> { 824 if (error1 != null) { 825 future.completeExceptionally(error1); 826 return; 827 } 828 List<HRegionLocation> notDeployedRegions = locations.stream() 829 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 830 if (notDeployedRegions.size() > 0) { 831 if (LOG.isDebugEnabled()) { 832 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 833 } 834 future.complete(false); 835 return; 836 } 837 future.complete(true); 838 }); 839 } 840 }); 841 return future; 842 } 843 844 @Override 845 public CompletableFuture<Void> addColumnFamily(TableName tableName, 846 ColumnFamilyDescriptor columnFamily) { 847 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 848 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 849 ng.newNonce()), 850 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 851 new AddColumnFamilyProcedureBiConsumer(tableName)); 852 } 853 854 @Override 855 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 856 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 857 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 858 ng.newNonce()), 859 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 860 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 861 } 862 863 @Override 864 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 865 ColumnFamilyDescriptor columnFamily) { 866 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 867 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 868 ng.newNonce()), 869 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 870 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 871 } 872 873 @Override 874 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 875 byte[] family, String dstSFT) { 876 return this.<ModifyColumnStoreFileTrackerRequest, 877 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 878 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 879 ng.getNonceGroup(), ng.newNonce()), 880 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 881 (resp) -> resp.getProcId(), 882 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 883 } 884 885 @Override 886 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 887 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 888 RequestConverter.buildCreateNamespaceRequest(descriptor), 889 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 890 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 891 } 892 893 @Override 894 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 895 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 896 RequestConverter.buildModifyNamespaceRequest(descriptor), 897 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 898 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 899 } 900 901 @Override 902 public CompletableFuture<Void> deleteNamespace(String name) { 903 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 904 RequestConverter.buildDeleteNamespaceRequest(name), 905 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 906 new DeleteNamespaceProcedureBiConsumer(name)); 907 } 908 909 @Override 910 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 911 return this.<NamespaceDescriptor> newMasterCaller() 912 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 913 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 914 RequestConverter.buildGetNamespaceDescriptorRequest(name), 915 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 916 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 917 .call(); 918 } 919 920 @Override 921 public CompletableFuture<List<String>> listNamespaces() { 922 return this.<List<String>> newMasterCaller() 923 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 924 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 925 (s, c, req, done) -> s.listNamespaces(c, req, done), 926 (resp) -> resp.getNamespaceNameList())) 927 .call(); 928 } 929 930 @Override 931 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 932 return this.<List<NamespaceDescriptor>> newMasterCaller() 933 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 934 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 935 ListNamespaceDescriptorsRequest.newBuilder().build(), 936 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 937 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 938 .call(); 939 } 940 941 @Override 942 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 943 return this.<List<RegionInfo>> newAdminCaller() 944 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 945 List<RegionInfo>> adminCall(controller, stub, 946 RequestConverter.buildGetOnlineRegionRequest(), 947 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 948 resp -> ProtobufUtil.getRegionInfos(resp))) 949 .serverName(serverName).call(); 950 } 951 952 @Override 953 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 954 if (tableName.equals(META_TABLE_NAME)) { 955 return connection.registry.getMetaRegionLocations() 956 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 957 .collect(Collectors.toList())); 958 } else { 959 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 960 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 961 } 962 } 963 964 @Override 965 public CompletableFuture<Void> flush(TableName tableName) { 966 return flush(tableName, Collections.emptyList()); 967 } 968 969 @Override 970 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 971 Preconditions.checkNotNull(columnFamily, 972 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 973 return flush(tableName, Collections.singletonList(columnFamily)); 974 } 975 976 @Override 977 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 978 // This is for keeping compatibility with old implementation. 979 // If the server version is lower than the client version, it's possible that the 980 // flushTable method is not present in the server side, if so, we need to fall back 981 // to the old implementation. 982 Preconditions.checkNotNull(columnFamilyList, 983 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 984 List<byte[]> columnFamilies = columnFamilyList.stream() 985 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 986 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 987 ng.getNonceGroup(), ng.newNonce()); 988 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 989 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 990 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 991 CompletableFuture<Void> future = new CompletableFuture<>(); 992 addListener(procFuture, (ret, error) -> { 993 if (error != null) { 994 if ( 995 error instanceof TableNotFoundException || error instanceof TableNotEnabledException 996 || error instanceof NoSuchColumnFamilyException 997 ) { 998 future.completeExceptionally(error); 999 } else if (error instanceof DoNotRetryIOException) { 1000 // usually this is caused by the method is not present on the server or 1001 // the hbase hadoop version does not match the running hadoop version. 1002 // if that happens, we need fall back to the old flush implementation. 1003 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 1004 legacyFlush(future, tableName, columnFamilies); 1005 } else { 1006 future.completeExceptionally(error); 1007 } 1008 } else { 1009 future.complete(ret); 1010 } 1011 }); 1012 return future; 1013 } 1014 1015 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1016 List<byte[]> columnFamilies) { 1017 addListener(tableExists(tableName), (exists, err) -> { 1018 if (err != null) { 1019 future.completeExceptionally(err); 1020 } else if (!exists) { 1021 future.completeExceptionally(new TableNotFoundException(tableName)); 1022 } else { 1023 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1024 if (err2 != null) { 1025 future.completeExceptionally(err2); 1026 } else if (!tableEnabled) { 1027 future.completeExceptionally(new TableNotEnabledException(tableName)); 1028 } else { 1029 Map<String, String> props = new HashMap<>(); 1030 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1031 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1032 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1033 } 1034 addListener( 1035 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1036 (ret, err3) -> { 1037 if (err3 != null) { 1038 future.completeExceptionally(err3); 1039 } else { 1040 future.complete(ret); 1041 } 1042 }); 1043 } 1044 }); 1045 } 1046 }); 1047 } 1048 1049 @Override 1050 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1051 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 1052 }); 1053 } 1054 1055 @Override 1056 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1057 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1058 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1059 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1060 }); 1061 } 1062 1063 /** 1064 * This method is for internal use only, where we need the response of the flush. 1065 * <p/> 1066 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1067 * API. 1068 */ 1069 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1070 boolean writeFlushWALMarker) { 1071 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1072 addListener(getRegionLocation(regionName), (location, err) -> { 1073 if (err != null) { 1074 future.completeExceptionally(err); 1075 return; 1076 } 1077 ServerName serverName = location.getServerName(); 1078 if (serverName == null) { 1079 future 1080 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1081 return; 1082 } 1083 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1084 (ret, err2) -> { 1085 if (err2 != null) { 1086 future.completeExceptionally(err2); 1087 } else { 1088 future.complete(ret); 1089 } 1090 }); 1091 }); 1092 return future; 1093 } 1094 1095 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1096 byte[] columnFamily, boolean writeFlushWALMarker) { 1097 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1098 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1099 FlushRegionResponse> adminCall(controller, stub, 1100 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1101 writeFlushWALMarker), 1102 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1103 .call(); 1104 } 1105 1106 @Override 1107 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1108 CompletableFuture<Void> future = new CompletableFuture<>(); 1109 addListener(getRegions(sn), (hRegionInfos, err) -> { 1110 if (err != null) { 1111 future.completeExceptionally(err); 1112 return; 1113 } 1114 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1115 if (hRegionInfos != null) { 1116 hRegionInfos 1117 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1118 }))); 1119 } 1120 addListener(CompletableFuture.allOf( 1121 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1122 if (err2 != null) { 1123 future.completeExceptionally(err2); 1124 } else { 1125 future.complete(ret); 1126 } 1127 }); 1128 }); 1129 return future; 1130 } 1131 1132 @Override 1133 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1134 return compact(tableName, null, false, compactType); 1135 } 1136 1137 @Override 1138 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1139 CompactType compactType) { 1140 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1141 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1142 return compact(tableName, columnFamily, false, compactType); 1143 } 1144 1145 @Override 1146 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1147 return compactRegion(regionName, null, false); 1148 } 1149 1150 @Override 1151 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1152 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1153 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1154 return compactRegion(regionName, columnFamily, false); 1155 } 1156 1157 @Override 1158 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1159 return compact(tableName, null, true, compactType); 1160 } 1161 1162 @Override 1163 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1164 CompactType compactType) { 1165 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1166 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1167 return compact(tableName, columnFamily, true, compactType); 1168 } 1169 1170 @Override 1171 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1172 return compactRegion(regionName, null, true); 1173 } 1174 1175 @Override 1176 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1177 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1178 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1179 return compactRegion(regionName, columnFamily, true); 1180 } 1181 1182 @Override 1183 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1184 return compactRegionServer(sn, false); 1185 } 1186 1187 @Override 1188 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1189 return compactRegionServer(sn, true); 1190 } 1191 1192 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1193 CompletableFuture<Void> future = new CompletableFuture<>(); 1194 addListener(getRegions(sn), (hRegionInfos, err) -> { 1195 if (err != null) { 1196 future.completeExceptionally(err); 1197 return; 1198 } 1199 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1200 if (hRegionInfos != null) { 1201 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1202 } 1203 addListener(CompletableFuture.allOf( 1204 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1205 if (err2 != null) { 1206 future.completeExceptionally(err2); 1207 } else { 1208 future.complete(ret); 1209 } 1210 }); 1211 }); 1212 return future; 1213 } 1214 1215 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1216 boolean major) { 1217 CompletableFuture<Void> future = new CompletableFuture<>(); 1218 addListener(getRegionLocation(regionName), (location, err) -> { 1219 if (err != null) { 1220 future.completeExceptionally(err); 1221 return; 1222 } 1223 ServerName serverName = location.getServerName(); 1224 if (serverName == null) { 1225 future 1226 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1227 return; 1228 } 1229 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1230 (ret, err2) -> { 1231 if (err2 != null) { 1232 future.completeExceptionally(err2); 1233 } else { 1234 future.complete(ret); 1235 } 1236 }); 1237 }); 1238 return future; 1239 } 1240 1241 /** 1242 * List all region locations for the specific table. 1243 */ 1244 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1245 if (TableName.META_TABLE_NAME.equals(tableName)) { 1246 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1247 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1248 if (err != null) { 1249 future.completeExceptionally(err); 1250 } else if ( 1251 metaRegions == null || metaRegions.isEmpty() 1252 || metaRegions.getDefaultRegionLocation() == null 1253 ) { 1254 future.completeExceptionally(new IOException("meta region does not found")); 1255 } else { 1256 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1257 } 1258 }); 1259 return future; 1260 } else { 1261 // For non-meta table, we fetch all locations by scanning hbase:meta table 1262 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1263 } 1264 } 1265 1266 /** 1267 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1268 */ 1269 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1270 CompactType compactType) { 1271 CompletableFuture<Void> future = new CompletableFuture<>(); 1272 1273 switch (compactType) { 1274 case MOB: 1275 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1276 if (err != null) { 1277 future.completeExceptionally(err); 1278 return; 1279 } 1280 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1281 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1282 if (err2 != null) { 1283 future.completeExceptionally(err2); 1284 } else { 1285 future.complete(ret); 1286 } 1287 }); 1288 }); 1289 break; 1290 case NORMAL: 1291 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1292 if (err != null) { 1293 future.completeExceptionally(err); 1294 return; 1295 } 1296 if (locations == null || locations.isEmpty()) { 1297 future.completeExceptionally(new TableNotFoundException(tableName)); 1298 } 1299 CompletableFuture<?>[] compactFutures = 1300 locations.stream().filter(l -> l.getRegion() != null) 1301 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1302 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1303 .toArray(CompletableFuture<?>[]::new); 1304 // future complete unless all of the compact futures are completed. 1305 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1306 if (err2 != null) { 1307 future.completeExceptionally(err2); 1308 } else { 1309 future.complete(ret); 1310 } 1311 }); 1312 }); 1313 break; 1314 default: 1315 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1316 } 1317 return future; 1318 } 1319 1320 /** 1321 * Compact the region at specific region server. 1322 */ 1323 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1324 final boolean major, byte[] columnFamily) { 1325 return this.<Void> newAdminCaller().serverName(sn) 1326 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1327 Void> adminCall(controller, stub, 1328 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1329 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1330 .call(); 1331 } 1332 1333 private byte[] toEncodeRegionName(byte[] regionName) { 1334 return RegionInfo.isEncodedRegionName(regionName) 1335 ? regionName 1336 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1337 } 1338 1339 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1340 CompletableFuture<TableName> result) { 1341 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1342 if (err != null) { 1343 result.completeExceptionally(err); 1344 return; 1345 } 1346 RegionInfo regionInfo = location.getRegion(); 1347 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1348 result.completeExceptionally( 1349 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1350 return; 1351 } 1352 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1353 if (!tableName.get().equals(regionInfo.getTable())) { 1354 // tables of this two region should be same. 1355 result.completeExceptionally( 1356 new IllegalArgumentException("Cannot merge regions from two different tables " 1357 + tableName.get() + " and " + regionInfo.getTable())); 1358 } else { 1359 result.complete(tableName.get()); 1360 } 1361 } 1362 }); 1363 } 1364 1365 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1366 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1367 CompletableFuture<TableName> future = new CompletableFuture<>(); 1368 for (byte[] encodedRegionName : encodedRegionNames) { 1369 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1370 } 1371 return future; 1372 } 1373 1374 @Override 1375 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1376 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1377 } 1378 1379 @Override 1380 public CompletableFuture<Boolean> isMergeEnabled() { 1381 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1382 } 1383 1384 @Override 1385 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1386 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1387 } 1388 1389 @Override 1390 public CompletableFuture<Boolean> isSplitEnabled() { 1391 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1392 } 1393 1394 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1395 MasterSwitchType switchType) { 1396 SetSplitOrMergeEnabledRequest request = 1397 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1398 return this.<Boolean> newMasterCaller() 1399 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1400 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1401 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1402 (resp) -> resp.getPrevValueList().get(0))) 1403 .call(); 1404 } 1405 1406 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1407 IsSplitOrMergeEnabledRequest request = 1408 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1409 return this.<Boolean> newMasterCaller() 1410 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1411 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1412 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1413 .call(); 1414 } 1415 1416 @Override 1417 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1418 if (nameOfRegionsToMerge.size() < 2) { 1419 return failedFuture(new IllegalArgumentException( 1420 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1421 } 1422 CompletableFuture<Void> future = new CompletableFuture<>(); 1423 byte[][] encodedNameOfRegionsToMerge = 1424 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1425 1426 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1427 if (err != null) { 1428 future.completeExceptionally(err); 1429 return; 1430 } 1431 1432 final MergeTableRegionsRequest request; 1433 try { 1434 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1435 forcible, ng.getNonceGroup(), ng.newNonce()); 1436 } catch (DeserializationException e) { 1437 future.completeExceptionally(e); 1438 return; 1439 } 1440 1441 addListener( 1442 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1443 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1444 (ret, err2) -> { 1445 if (err2 != null) { 1446 future.completeExceptionally(err2); 1447 } else { 1448 future.complete(ret); 1449 } 1450 }); 1451 }); 1452 return future; 1453 } 1454 1455 @Override 1456 public CompletableFuture<Void> split(TableName tableName) { 1457 CompletableFuture<Void> future = new CompletableFuture<>(); 1458 addListener(tableExists(tableName), (exist, error) -> { 1459 if (error != null) { 1460 future.completeExceptionally(error); 1461 return; 1462 } 1463 if (!exist) { 1464 future.completeExceptionally(new TableNotFoundException(tableName)); 1465 return; 1466 } 1467 addListener(metaTable 1468 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1469 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1470 ClientMetaTableAccessor.QueryType.REGION)) 1471 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1472 ClientMetaTableAccessor.QueryType.REGION))), 1473 (results, err2) -> { 1474 if (err2 != null) { 1475 future.completeExceptionally(err2); 1476 return; 1477 } 1478 if (results != null && !results.isEmpty()) { 1479 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1480 for (Result r : results) { 1481 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1482 continue; 1483 } 1484 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1485 if (rl != null) { 1486 for (HRegionLocation h : rl.getRegionLocations()) { 1487 if (h != null && h.getServerName() != null) { 1488 RegionInfo hri = h.getRegion(); 1489 if ( 1490 hri == null || hri.isSplitParent() 1491 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1492 ) { 1493 continue; 1494 } 1495 splitFutures.add(split(hri, null)); 1496 } 1497 } 1498 } 1499 } 1500 addListener( 1501 CompletableFuture 1502 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1503 (ret, exception) -> { 1504 if (exception != null) { 1505 future.completeExceptionally(exception); 1506 return; 1507 } 1508 future.complete(ret); 1509 }); 1510 } else { 1511 future.complete(null); 1512 } 1513 }); 1514 }); 1515 return future; 1516 } 1517 1518 @Override 1519 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1520 CompletableFuture<Void> result = new CompletableFuture<>(); 1521 if (splitPoint == null) { 1522 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1523 } 1524 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1525 (loc, err) -> { 1526 if (err != null) { 1527 result.completeExceptionally(err); 1528 } else if (loc == null || loc.getRegion() == null) { 1529 result.completeExceptionally(new IllegalArgumentException( 1530 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1531 } else { 1532 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1533 if (err2 != null) { 1534 result.completeExceptionally(err2); 1535 } else { 1536 result.complete(ret); 1537 } 1538 1539 }); 1540 } 1541 }); 1542 return result; 1543 } 1544 1545 @Override 1546 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1547 CompletableFuture<Void> future = new CompletableFuture<>(); 1548 addListener(getRegionLocation(regionName), (location, err) -> { 1549 if (err != null) { 1550 future.completeExceptionally(err); 1551 return; 1552 } 1553 RegionInfo regionInfo = location.getRegion(); 1554 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1555 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1556 + "Replicas are auto-split when their primary is split.")); 1557 return; 1558 } 1559 ServerName serverName = location.getServerName(); 1560 if (serverName == null) { 1561 future 1562 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1563 return; 1564 } 1565 addListener(split(regionInfo, null), (ret, err2) -> { 1566 if (err2 != null) { 1567 future.completeExceptionally(err2); 1568 } else { 1569 future.complete(ret); 1570 } 1571 }); 1572 }); 1573 return future; 1574 } 1575 1576 @Override 1577 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1578 Preconditions.checkNotNull(splitPoint, 1579 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1580 CompletableFuture<Void> future = new CompletableFuture<>(); 1581 addListener(getRegionLocation(regionName), (location, err) -> { 1582 if (err != null) { 1583 future.completeExceptionally(err); 1584 return; 1585 } 1586 RegionInfo regionInfo = location.getRegion(); 1587 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1588 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1589 + "Replicas are auto-split when their primary is split.")); 1590 return; 1591 } 1592 ServerName serverName = location.getServerName(); 1593 if (serverName == null) { 1594 future 1595 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1596 return; 1597 } 1598 if ( 1599 regionInfo.getStartKey() != null 1600 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1601 ) { 1602 future.completeExceptionally( 1603 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1604 return; 1605 } 1606 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1607 if (err2 != null) { 1608 future.completeExceptionally(err2); 1609 } else { 1610 future.complete(ret); 1611 } 1612 }); 1613 }); 1614 return future; 1615 } 1616 1617 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1618 CompletableFuture<Void> future = new CompletableFuture<>(); 1619 TableName tableName = hri.getTable(); 1620 final SplitTableRegionRequest request; 1621 try { 1622 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1623 ng.newNonce()); 1624 } catch (DeserializationException e) { 1625 future.completeExceptionally(e); 1626 return future; 1627 } 1628 1629 addListener( 1630 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1631 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1632 (ret, err2) -> { 1633 if (err2 != null) { 1634 future.completeExceptionally(err2); 1635 } else { 1636 future.complete(ret); 1637 } 1638 }); 1639 return future; 1640 } 1641 1642 @Override 1643 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1644 CompletableFuture<Void> future = new CompletableFuture<>(); 1645 addListener(getRegionLocation(regionName), (location, err) -> { 1646 if (err != null) { 1647 future.completeExceptionally(err); 1648 return; 1649 } 1650 RegionInfo regionInfo = location.getRegion(); 1651 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1652 future.completeExceptionally(new IllegalArgumentException( 1653 "Can't truncate replicas directly.Replicas are auto-truncated " 1654 + "when their primary is truncated.")); 1655 return; 1656 } 1657 ServerName serverName = location.getServerName(); 1658 if (serverName == null) { 1659 future 1660 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1661 return; 1662 } 1663 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1664 if (err2 != null) { 1665 future.completeExceptionally(err2); 1666 } else { 1667 future.complete(ret); 1668 } 1669 }); 1670 }); 1671 return future; 1672 } 1673 1674 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1675 CompletableFuture<Void> future = new CompletableFuture<>(); 1676 TableName tableName = hri.getTable(); 1677 final MasterProtos.TruncateRegionRequest request; 1678 try { 1679 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1680 } catch (DeserializationException e) { 1681 future.completeExceptionally(e); 1682 return future; 1683 } 1684 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1685 MasterProtos.TruncateRegionResponse::getProcId, 1686 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1687 if (err2 != null) { 1688 future.completeExceptionally(err2); 1689 } else { 1690 future.complete(ret); 1691 } 1692 }); 1693 return future; 1694 } 1695 1696 @Override 1697 public CompletableFuture<Void> assign(byte[] regionName) { 1698 CompletableFuture<Void> future = new CompletableFuture<>(); 1699 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1700 if (err != null) { 1701 future.completeExceptionally(err); 1702 return; 1703 } 1704 addListener( 1705 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1706 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1707 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1708 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1709 .call(), 1710 (ret, err2) -> { 1711 if (err2 != null) { 1712 future.completeExceptionally(err2); 1713 } else { 1714 future.complete(ret); 1715 } 1716 }); 1717 }); 1718 return future; 1719 } 1720 1721 @Override 1722 public CompletableFuture<Void> unassign(byte[] regionName) { 1723 CompletableFuture<Void> future = new CompletableFuture<>(); 1724 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1725 if (err != null) { 1726 future.completeExceptionally(err); 1727 return; 1728 } 1729 addListener( 1730 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1731 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1732 Void> call(controller, stub, 1733 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1734 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1735 .call(), 1736 (ret, err2) -> { 1737 if (err2 != null) { 1738 future.completeExceptionally(err2); 1739 } else { 1740 future.complete(ret); 1741 } 1742 }); 1743 }); 1744 return future; 1745 } 1746 1747 @Override 1748 public CompletableFuture<Void> offline(byte[] regionName) { 1749 CompletableFuture<Void> future = new CompletableFuture<>(); 1750 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1751 if (err != null) { 1752 future.completeExceptionally(err); 1753 return; 1754 } 1755 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1756 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1757 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1758 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1759 .call(), (ret, err2) -> { 1760 if (err2 != null) { 1761 future.completeExceptionally(err2); 1762 } else { 1763 future.complete(ret); 1764 } 1765 }); 1766 }); 1767 return future; 1768 } 1769 1770 @Override 1771 public CompletableFuture<Void> move(byte[] regionName) { 1772 CompletableFuture<Void> future = new CompletableFuture<>(); 1773 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1774 if (err != null) { 1775 future.completeExceptionally(err); 1776 return; 1777 } 1778 addListener( 1779 moveRegion(regionInfo, 1780 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1781 (ret, err2) -> { 1782 if (err2 != null) { 1783 future.completeExceptionally(err2); 1784 } else { 1785 future.complete(ret); 1786 } 1787 }); 1788 }); 1789 return future; 1790 } 1791 1792 @Override 1793 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1794 Preconditions.checkNotNull(destServerName, 1795 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1796 CompletableFuture<Void> future = new CompletableFuture<>(); 1797 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1798 if (err != null) { 1799 future.completeExceptionally(err); 1800 return; 1801 } 1802 addListener( 1803 moveRegion(regionInfo, RequestConverter 1804 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1805 (ret, err2) -> { 1806 if (err2 != null) { 1807 future.completeExceptionally(err2); 1808 } else { 1809 future.complete(ret); 1810 } 1811 }); 1812 }); 1813 return future; 1814 } 1815 1816 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1817 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1818 .action( 1819 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1820 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1821 .call(); 1822 } 1823 1824 @Override 1825 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1826 return this.<Void> newMasterCaller() 1827 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1828 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1829 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1830 .call(); 1831 } 1832 1833 @Override 1834 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1835 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1836 Scan scan = QuotaTableUtil.makeScan(filter); 1837 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1838 new AdvancedScanResultConsumer() { 1839 List<QuotaSettings> settings = new ArrayList<>(); 1840 1841 @Override 1842 public void onNext(Result[] results, ScanController controller) { 1843 for (Result result : results) { 1844 try { 1845 QuotaTableUtil.parseResultToCollection(result, settings); 1846 } catch (IOException e) { 1847 controller.terminate(); 1848 future.completeExceptionally(e); 1849 } 1850 } 1851 } 1852 1853 @Override 1854 public void onError(Throwable error) { 1855 future.completeExceptionally(error); 1856 } 1857 1858 @Override 1859 public void onComplete() { 1860 future.complete(settings); 1861 } 1862 }); 1863 return future; 1864 } 1865 1866 @Override 1867 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1868 boolean enabled) { 1869 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1870 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1871 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1872 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1873 } 1874 1875 @Override 1876 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1877 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1878 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1879 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1880 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1881 } 1882 1883 @Override 1884 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1885 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1886 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1887 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1888 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1889 } 1890 1891 @Override 1892 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1893 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1894 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1895 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1896 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1897 } 1898 1899 @Override 1900 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1901 return this.<ReplicationPeerConfig> newMasterCaller() 1902 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1903 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1904 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1905 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1906 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1907 .call(); 1908 } 1909 1910 @Override 1911 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1912 ReplicationPeerConfig peerConfig) { 1913 return this.<UpdateReplicationPeerConfigRequest, 1914 UpdateReplicationPeerConfigResponse> procedureCall( 1915 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1916 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1917 (resp) -> resp.getProcId(), 1918 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1919 } 1920 1921 @Override 1922 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1923 SyncReplicationState clusterState) { 1924 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1925 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1926 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1927 clusterState), 1928 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1929 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1930 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1931 } 1932 1933 @Override 1934 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1935 Map<TableName, List<String>> tableCfs) { 1936 if (tableCfs == null) { 1937 return failedFuture(new ReplicationException("tableCfs is null")); 1938 } 1939 1940 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1941 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1942 if (!completeExceptionally(future, error)) { 1943 ReplicationPeerConfig newPeerConfig = 1944 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1945 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1946 if (!completeExceptionally(future, error)) { 1947 future.complete(result); 1948 } 1949 }); 1950 } 1951 }); 1952 return future; 1953 } 1954 1955 @Override 1956 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1957 Map<TableName, List<String>> tableCfs) { 1958 if (tableCfs == null) { 1959 return failedFuture(new ReplicationException("tableCfs is null")); 1960 } 1961 1962 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1963 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1964 if (!completeExceptionally(future, error)) { 1965 ReplicationPeerConfig newPeerConfig = null; 1966 try { 1967 newPeerConfig = ReplicationPeerConfigUtil 1968 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1969 } catch (ReplicationException e) { 1970 future.completeExceptionally(e); 1971 return; 1972 } 1973 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1974 if (!completeExceptionally(future, error)) { 1975 future.complete(result); 1976 } 1977 }); 1978 } 1979 }); 1980 return future; 1981 } 1982 1983 @Override 1984 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1985 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1986 } 1987 1988 @Override 1989 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1990 Preconditions.checkNotNull(pattern, 1991 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1992 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1993 } 1994 1995 private CompletableFuture<List<ReplicationPeerDescription>> 1996 listReplicationPeers(ListReplicationPeersRequest request) { 1997 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1998 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1999 List<ReplicationPeerDescription>> call(controller, stub, request, 2000 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 2001 (resp) -> resp.getPeerDescList().stream() 2002 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 2003 .collect(Collectors.toList()))) 2004 .call(); 2005 } 2006 2007 @Override 2008 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 2009 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 2010 addListener(listTableDescriptors(), (tables, error) -> { 2011 if (!completeExceptionally(future, error)) { 2012 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 2013 tables.forEach(table -> { 2014 Map<String, Integer> cfs = new HashMap<>(); 2015 Stream.of(table.getColumnFamilies()) 2016 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 2017 .forEach(column -> { 2018 cfs.put(column.getNameAsString(), column.getScope()); 2019 }); 2020 if (!cfs.isEmpty()) { 2021 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 2022 } 2023 }); 2024 future.complete(replicatedTableCFs); 2025 } 2026 }); 2027 return future; 2028 } 2029 2030 @Override 2031 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 2032 SnapshotProtos.SnapshotDescription snapshot = 2033 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2034 try { 2035 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2036 } catch (IllegalArgumentException e) { 2037 return failedFuture(e); 2038 } 2039 CompletableFuture<Void> future = new CompletableFuture<>(); 2040 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2041 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2042 addListener(this.<SnapshotResponse> newMasterCaller() 2043 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2044 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2045 .call(), (resp, err) -> { 2046 if (err != null) { 2047 future.completeExceptionally(err); 2048 return; 2049 } 2050 waitSnapshotFinish(snapshotDesc, future, resp); 2051 }); 2052 return future; 2053 } 2054 2055 // This is for keeping compatibility with old implementation. 2056 // If there is a procId field in the response, then the snapshot will be operated with a 2057 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2058 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2059 SnapshotResponse resp) { 2060 if (resp.hasProcId()) { 2061 getProcedureResult(resp.getProcId(), future, 0); 2062 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2063 } else { 2064 long expectedTimeout = resp.getExpectedTimeout(); 2065 TimerTask pollingTask = new TimerTask() { 2066 int tries = 0; 2067 long startTime = EnvironmentEdgeManager.currentTime(); 2068 long endTime = startTime + expectedTimeout; 2069 long maxPauseTime = expectedTimeout / maxAttempts; 2070 2071 @Override 2072 public void run(Timeout timeout) throws Exception { 2073 if (EnvironmentEdgeManager.currentTime() < endTime) { 2074 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2075 if (err2 != null) { 2076 future.completeExceptionally(err2); 2077 } else if (done) { 2078 future.complete(null); 2079 } else { 2080 // retry again after pauseTime. 2081 long pauseTime = 2082 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2083 pauseTime = Math.min(pauseTime, maxPauseTime); 2084 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2085 } 2086 }); 2087 } else { 2088 future 2089 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2090 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2091 } 2092 } 2093 }; 2094 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2095 } 2096 } 2097 2098 @Override 2099 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2100 return this.<Boolean> newMasterCaller() 2101 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2102 Boolean> call(controller, stub, 2103 IsSnapshotDoneRequest.newBuilder() 2104 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2105 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2106 .call(); 2107 } 2108 2109 @Override 2110 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2111 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2112 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2113 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2114 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2115 } 2116 2117 @Override 2118 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2119 boolean restoreAcl) { 2120 CompletableFuture<Void> future = new CompletableFuture<>(); 2121 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2122 if (err != null) { 2123 future.completeExceptionally(err); 2124 return; 2125 } 2126 TableName tableName = null; 2127 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2128 for (SnapshotDescription snap : snapshotDescriptions) { 2129 if (snap.getName().equals(snapshotName)) { 2130 tableName = snap.getTableName(); 2131 break; 2132 } 2133 } 2134 } 2135 if (tableName == null) { 2136 future.completeExceptionally( 2137 new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist.")); 2138 return; 2139 } 2140 final TableName finalTableName = tableName; 2141 addListener(tableExists(finalTableName), (exists, err2) -> { 2142 if (err2 != null) { 2143 future.completeExceptionally(err2); 2144 } else if (!exists) { 2145 // if table does not exist, then just clone snapshot into new table. 2146 completeConditionalOnFuture(future, 2147 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2148 } else { 2149 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2150 if (err4 != null) { 2151 future.completeExceptionally(err4); 2152 } else if (!disabled) { 2153 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2154 } else { 2155 completeConditionalOnFuture(future, 2156 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2157 } 2158 }); 2159 } 2160 }); 2161 }); 2162 return future; 2163 } 2164 2165 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2166 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2167 if (takeFailSafeSnapshot) { 2168 CompletableFuture<Void> future = new CompletableFuture<>(); 2169 // Step.1 Take a snapshot of the current state 2170 String failSafeSnapshotSnapshotNameFormat = 2171 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2172 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2173 final String failSafeSnapshotSnapshotName = 2174 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2175 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2176 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2177 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2178 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2179 if (err != null) { 2180 future.completeExceptionally(err); 2181 } else { 2182 // Step.2 Restore snapshot 2183 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2184 (void2, err2) -> { 2185 if (err2 != null) { 2186 // Step.3.a Something went wrong during the restore and try to rollback. 2187 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2188 restoreAcl, null), (void3, err3) -> { 2189 if (err3 != null) { 2190 future.completeExceptionally(err3); 2191 } else { 2192 // If fail to restore snapshot but rollback successfully, delete the 2193 // restore-failsafe snapshot. 2194 LOG.info( 2195 "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2196 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> { 2197 if (err4 != null) { 2198 LOG.error("Unable to remove the failsafe snapshot: {}", 2199 failSafeSnapshotSnapshotName, err4); 2200 } 2201 String msg = 2202 "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot=" 2203 + failSafeSnapshotSnapshotName + " succeeded."; 2204 LOG.error(msg); 2205 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2206 }); 2207 } 2208 }); 2209 } else { 2210 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2211 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2212 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2213 if (err3 != null) { 2214 LOG.error( 2215 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2216 err3); 2217 } 2218 future.complete(ret3); 2219 }); 2220 } 2221 }); 2222 } 2223 }); 2224 return future; 2225 } else { 2226 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2227 } 2228 } 2229 2230 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2231 CompletableFuture<T> parentFuture) { 2232 addListener(parentFuture, (res, err) -> { 2233 if (err != null) { 2234 dependentFuture.completeExceptionally(err); 2235 } else { 2236 dependentFuture.complete(res); 2237 } 2238 }); 2239 } 2240 2241 @Override 2242 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2243 boolean restoreAcl, String customSFT) { 2244 CompletableFuture<Void> future = new CompletableFuture<>(); 2245 addListener(tableExists(tableName), (exists, err) -> { 2246 if (err != null) { 2247 future.completeExceptionally(err); 2248 } else if (exists) { 2249 future.completeExceptionally(new TableExistsException(tableName)); 2250 } else { 2251 completeConditionalOnFuture(future, 2252 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2253 } 2254 }); 2255 return future; 2256 } 2257 2258 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2259 boolean restoreAcl, String customSFT) { 2260 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2261 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2262 try { 2263 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2264 } catch (IllegalArgumentException e) { 2265 return failedFuture(e); 2266 } 2267 RestoreSnapshotRequest.Builder builder = 2268 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2269 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2270 if (customSFT != null) { 2271 builder.setCustomSFT(customSFT); 2272 } 2273 return waitProcedureResult(this.<Long> newMasterCaller() 2274 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2275 Long> call(controller, stub, builder.build(), 2276 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2277 .call()); 2278 } 2279 2280 @Override 2281 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2282 return getCompletedSnapshots(null); 2283 } 2284 2285 @Override 2286 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2287 Preconditions.checkNotNull(pattern, 2288 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2289 return getCompletedSnapshots(pattern); 2290 } 2291 2292 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2293 return this.<List<SnapshotDescription>> newMasterCaller() 2294 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2295 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2296 GetCompletedSnapshotsRequest.newBuilder().build(), 2297 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2298 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2299 .call(); 2300 } 2301 2302 @Override 2303 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2304 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2305 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2306 return getCompletedSnapshots(tableNamePattern, null); 2307 } 2308 2309 @Override 2310 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2311 Pattern snapshotNamePattern) { 2312 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2313 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2314 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2315 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2316 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2317 } 2318 2319 private CompletableFuture<List<SnapshotDescription>> 2320 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2321 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2322 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2323 if (err != null) { 2324 future.completeExceptionally(err); 2325 return; 2326 } 2327 if (tableNames == null || tableNames.size() <= 0) { 2328 future.complete(Collections.emptyList()); 2329 return; 2330 } 2331 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2332 if (err2 != null) { 2333 future.completeExceptionally(err2); 2334 return; 2335 } 2336 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2337 future.complete(Collections.emptyList()); 2338 return; 2339 } 2340 future.complete(snapshotDescList.stream() 2341 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2342 .collect(Collectors.toList())); 2343 }); 2344 }); 2345 return future; 2346 } 2347 2348 @Override 2349 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2350 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2351 } 2352 2353 @Override 2354 public CompletableFuture<Void> deleteSnapshots() { 2355 return internalDeleteSnapshots(null, null); 2356 } 2357 2358 @Override 2359 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2360 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2361 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2362 return internalDeleteSnapshots(null, snapshotNamePattern); 2363 } 2364 2365 @Override 2366 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2367 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2368 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2369 return internalDeleteSnapshots(tableNamePattern, null); 2370 } 2371 2372 @Override 2373 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2374 Pattern snapshotNamePattern) { 2375 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2376 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2377 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2378 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2379 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2380 } 2381 2382 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2383 Pattern snapshotNamePattern) { 2384 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2385 if (tableNamePattern == null) { 2386 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2387 } else { 2388 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2389 } 2390 CompletableFuture<Void> future = new CompletableFuture<>(); 2391 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2392 if (err != null) { 2393 future.completeExceptionally(err); 2394 return; 2395 } 2396 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2397 future.complete(null); 2398 return; 2399 } 2400 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2401 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2402 if (e != null) { 2403 future.completeExceptionally(e); 2404 } else { 2405 future.complete(v); 2406 } 2407 }); 2408 }); 2409 return future; 2410 } 2411 2412 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2413 return this.<Void> newMasterCaller() 2414 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2415 controller, stub, 2416 DeleteSnapshotRequest.newBuilder() 2417 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2418 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2419 .call(); 2420 } 2421 2422 @Override 2423 public CompletableFuture<Void> execProcedure(String signature, String instance, 2424 Map<String, String> props) { 2425 CompletableFuture<Void> future = new CompletableFuture<>(); 2426 ProcedureDescription procDesc = 2427 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2428 addListener(this.<Long> newMasterCaller() 2429 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2430 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2431 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2432 .call(), (expectedTimeout, err) -> { 2433 if (err != null) { 2434 future.completeExceptionally(err); 2435 return; 2436 } 2437 TimerTask pollingTask = new TimerTask() { 2438 int tries = 0; 2439 long startTime = EnvironmentEdgeManager.currentTime(); 2440 long endTime = startTime + expectedTimeout; 2441 long maxPauseTime = expectedTimeout / maxAttempts; 2442 2443 @Override 2444 public void run(Timeout timeout) throws Exception { 2445 if (EnvironmentEdgeManager.currentTime() < endTime) { 2446 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2447 if (err2 != null) { 2448 future.completeExceptionally(err2); 2449 return; 2450 } 2451 if (done) { 2452 future.complete(null); 2453 } else { 2454 // retry again after pauseTime. 2455 long pauseTime = 2456 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2457 pauseTime = Math.min(pauseTime, maxPauseTime); 2458 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2459 TimeUnit.MICROSECONDS); 2460 } 2461 }); 2462 } else { 2463 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2464 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2465 } 2466 } 2467 }; 2468 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2469 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2470 }); 2471 return future; 2472 } 2473 2474 @Override 2475 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2476 Map<String, String> props) { 2477 ProcedureDescription proDesc = 2478 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2479 return this.<byte[]> newMasterCaller() 2480 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2481 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2482 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2483 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2484 .call(); 2485 } 2486 2487 @Override 2488 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2489 Map<String, String> props) { 2490 ProcedureDescription proDesc = 2491 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2492 return this.<Boolean> newMasterCaller() 2493 .action( 2494 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2495 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2496 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2497 .call(); 2498 } 2499 2500 @Override 2501 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2502 return this.<Boolean> newMasterCaller().action( 2503 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2504 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2505 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2506 .call(); 2507 } 2508 2509 @Override 2510 public CompletableFuture<String> getProcedures() { 2511 return this.<String> newMasterCaller() 2512 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2513 controller, stub, GetProceduresRequest.newBuilder().build(), 2514 (s, c, req, done) -> s.getProcedures(c, req, done), 2515 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2516 .call(); 2517 } 2518 2519 @Override 2520 public CompletableFuture<String> getLocks() { 2521 return this.<String> newMasterCaller() 2522 .action( 2523 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2524 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2525 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2526 .call(); 2527 } 2528 2529 @Override 2530 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2531 boolean offload) { 2532 return this.<Void> newMasterCaller() 2533 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2534 DecommissionRegionServersResponse, Void> call(controller, stub, 2535 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2536 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2537 .call(); 2538 } 2539 2540 @Override 2541 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2542 return this.<List<ServerName>> newMasterCaller() 2543 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2544 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2545 ListDecommissionedRegionServersRequest.newBuilder().build(), 2546 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2547 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2548 .collect(Collectors.toList()))) 2549 .call(); 2550 } 2551 2552 @Override 2553 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2554 List<byte[]> encodedRegionNames) { 2555 return this.<Void> newMasterCaller() 2556 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2557 RecommissionRegionServerResponse, Void> call(controller, stub, 2558 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2559 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2560 .call(); 2561 } 2562 2563 /** 2564 * Get the region location for the passed region name. The region name may be a full region name 2565 * or encoded region name. If the region does not found, then it'll throw an 2566 * UnknownRegionException wrapped by a {@link CompletableFuture} 2567 * @param regionNameOrEncodedRegionName region name or encoded region name 2568 * @return region location, wrapped by a {@link CompletableFuture} 2569 */ 2570 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2571 if (regionNameOrEncodedRegionName == null) { 2572 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2573 } 2574 2575 CompletableFuture<Optional<HRegionLocation>> future; 2576 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2577 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2578 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2579 // old format encodedName, should be meta region 2580 future = connection.registry.getMetaRegionLocations() 2581 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2582 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2583 } else { 2584 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2585 regionNameOrEncodedRegionName); 2586 } 2587 } else { 2588 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2589 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2590 RegionInfo regionInfo; 2591 try { 2592 regionInfo = 2593 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2594 } catch (IOException ioe) { 2595 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2596 } 2597 2598 if (regionInfo.isMetaRegion()) { 2599 future = connection.registry.getMetaRegionLocations() 2600 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2601 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2602 .findFirst()); 2603 } else { 2604 future = 2605 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2606 } 2607 } 2608 2609 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2610 addListener(future, (location, err) -> { 2611 if (err != null) { 2612 returnedFuture.completeExceptionally(err); 2613 return; 2614 } 2615 if (!location.isPresent() || location.get().getRegion() == null) { 2616 returnedFuture.completeExceptionally( 2617 new UnknownRegionException("Invalid region name or encoded region name: " 2618 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2619 } else { 2620 returnedFuture.complete(location.get()); 2621 } 2622 }); 2623 return returnedFuture; 2624 } 2625 2626 /** 2627 * Get the region info for the passed region name. The region name may be a full region name or 2628 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2629 * wrapped by a {@link CompletableFuture} 2630 * @return region info, wrapped by a {@link CompletableFuture} 2631 */ 2632 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2633 if (regionNameOrEncodedRegionName == null) { 2634 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2635 } 2636 2637 if ( 2638 Bytes.equals(regionNameOrEncodedRegionName, 2639 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2640 || Bytes.equals(regionNameOrEncodedRegionName, 2641 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2642 ) { 2643 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2644 } 2645 2646 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2647 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2648 if (err != null) { 2649 future.completeExceptionally(err); 2650 } else { 2651 future.complete(location.getRegion()); 2652 } 2653 }); 2654 return future; 2655 } 2656 2657 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2658 if (numRegions < 3) { 2659 throw new IllegalArgumentException("Must create at least three regions"); 2660 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2661 throw new IllegalArgumentException("Start key must be smaller than end key"); 2662 } 2663 if (numRegions == 3) { 2664 return new byte[][] { startKey, endKey }; 2665 } 2666 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2667 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2668 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2669 } 2670 return splitKeys; 2671 } 2672 2673 private void verifySplitKeys(byte[][] splitKeys) { 2674 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2675 // Verify there are no duplicate split keys 2676 byte[] lastKey = null; 2677 for (byte[] splitKey : splitKeys) { 2678 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2679 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2680 } 2681 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2682 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2683 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2684 } 2685 lastKey = splitKey; 2686 } 2687 } 2688 2689 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2690 2691 abstract void onFinished(); 2692 2693 abstract void onError(Throwable error); 2694 2695 @Override 2696 public void accept(Void v, Throwable error) { 2697 if (error != null) { 2698 onError(error); 2699 return; 2700 } 2701 onFinished(); 2702 } 2703 } 2704 2705 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2706 protected final TableName tableName; 2707 2708 TableProcedureBiConsumer(TableName tableName) { 2709 this.tableName = tableName; 2710 } 2711 2712 abstract String getOperationType(); 2713 2714 String getDescription() { 2715 return "Operation: " + getOperationType() + ", " + "Table Name: " 2716 + tableName.getNameWithNamespaceInclAsString(); 2717 } 2718 2719 @Override 2720 void onFinished() { 2721 LOG.info(getDescription() + " completed"); 2722 } 2723 2724 @Override 2725 void onError(Throwable error) { 2726 LOG.info(getDescription() + " failed with " + error.getMessage()); 2727 } 2728 } 2729 2730 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2731 protected final String namespaceName; 2732 2733 NamespaceProcedureBiConsumer(String namespaceName) { 2734 this.namespaceName = namespaceName; 2735 } 2736 2737 abstract String getOperationType(); 2738 2739 String getDescription() { 2740 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2741 } 2742 2743 @Override 2744 void onFinished() { 2745 LOG.info(getDescription() + " completed"); 2746 } 2747 2748 @Override 2749 void onError(Throwable error) { 2750 LOG.info(getDescription() + " failed with " + error.getMessage()); 2751 } 2752 } 2753 2754 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2755 2756 CreateTableProcedureBiConsumer(TableName tableName) { 2757 super(tableName); 2758 } 2759 2760 @Override 2761 String getOperationType() { 2762 return "CREATE"; 2763 } 2764 } 2765 2766 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2767 2768 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2769 super(tableName); 2770 } 2771 2772 @Override 2773 String getOperationType() { 2774 return "MODIFY"; 2775 } 2776 } 2777 2778 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2779 extends TableProcedureBiConsumer { 2780 2781 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2782 super(tableName); 2783 } 2784 2785 @Override 2786 String getOperationType() { 2787 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2788 } 2789 } 2790 2791 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2792 2793 DeleteTableProcedureBiConsumer(TableName tableName) { 2794 super(tableName); 2795 } 2796 2797 @Override 2798 String getOperationType() { 2799 return "DELETE"; 2800 } 2801 2802 @Override 2803 void onFinished() { 2804 connection.getLocator().clearCache(this.tableName); 2805 super.onFinished(); 2806 } 2807 } 2808 2809 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2810 2811 TruncateTableProcedureBiConsumer(TableName tableName) { 2812 super(tableName); 2813 } 2814 2815 @Override 2816 String getOperationType() { 2817 return "TRUNCATE"; 2818 } 2819 } 2820 2821 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2822 2823 EnableTableProcedureBiConsumer(TableName tableName) { 2824 super(tableName); 2825 } 2826 2827 @Override 2828 String getOperationType() { 2829 return "ENABLE"; 2830 } 2831 } 2832 2833 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2834 2835 DisableTableProcedureBiConsumer(TableName tableName) { 2836 super(tableName); 2837 } 2838 2839 @Override 2840 String getOperationType() { 2841 return "DISABLE"; 2842 } 2843 } 2844 2845 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2846 2847 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2848 super(tableName); 2849 } 2850 2851 @Override 2852 String getOperationType() { 2853 return "ADD_COLUMN_FAMILY"; 2854 } 2855 } 2856 2857 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2858 2859 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2860 super(tableName); 2861 } 2862 2863 @Override 2864 String getOperationType() { 2865 return "DELETE_COLUMN_FAMILY"; 2866 } 2867 } 2868 2869 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2870 2871 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2872 super(tableName); 2873 } 2874 2875 @Override 2876 String getOperationType() { 2877 return "MODIFY_COLUMN_FAMILY"; 2878 } 2879 } 2880 2881 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2882 extends TableProcedureBiConsumer { 2883 2884 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2885 super(tableName); 2886 } 2887 2888 @Override 2889 String getOperationType() { 2890 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2891 } 2892 } 2893 2894 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2895 2896 FlushTableProcedureBiConsumer(TableName tableName) { 2897 super(tableName); 2898 } 2899 2900 @Override 2901 String getOperationType() { 2902 return "FLUSH"; 2903 } 2904 } 2905 2906 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2907 2908 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2909 super(namespaceName); 2910 } 2911 2912 @Override 2913 String getOperationType() { 2914 return "CREATE_NAMESPACE"; 2915 } 2916 } 2917 2918 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2919 2920 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2921 super(namespaceName); 2922 } 2923 2924 @Override 2925 String getOperationType() { 2926 return "DELETE_NAMESPACE"; 2927 } 2928 } 2929 2930 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2931 2932 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2933 super(namespaceName); 2934 } 2935 2936 @Override 2937 String getOperationType() { 2938 return "MODIFY_NAMESPACE"; 2939 } 2940 } 2941 2942 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2943 2944 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2945 super(tableName); 2946 } 2947 2948 @Override 2949 String getOperationType() { 2950 return "MERGE_REGIONS"; 2951 } 2952 } 2953 2954 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2955 2956 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2957 super(tableName); 2958 } 2959 2960 @Override 2961 String getOperationType() { 2962 return "SPLIT_REGION"; 2963 } 2964 } 2965 2966 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2967 2968 TruncateRegionProcedureBiConsumer(TableName tableName) { 2969 super(tableName); 2970 } 2971 2972 @Override 2973 String getOperationType() { 2974 return "TRUNCATE_REGION"; 2975 } 2976 } 2977 2978 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 2979 SnapshotProcedureBiConsumer(TableName tableName) { 2980 super(tableName); 2981 } 2982 2983 @Override 2984 String getOperationType() { 2985 return "SNAPSHOT"; 2986 } 2987 } 2988 2989 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2990 private final String peerId; 2991 private final Supplier<String> getOperation; 2992 2993 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2994 this.peerId = peerId; 2995 this.getOperation = getOperation; 2996 } 2997 2998 String getDescription() { 2999 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 3000 } 3001 3002 @Override 3003 void onFinished() { 3004 LOG.info(getDescription() + " completed"); 3005 } 3006 3007 @Override 3008 void onError(Throwable error) { 3009 LOG.info(getDescription() + " failed with " + error.getMessage()); 3010 } 3011 } 3012 3013 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 3014 CompletableFuture<Void> future = new CompletableFuture<>(); 3015 addListener(procFuture, (procId, error) -> { 3016 if (error != null) { 3017 future.completeExceptionally(error); 3018 return; 3019 } 3020 getProcedureResult(procId, future, 0); 3021 }); 3022 return future; 3023 } 3024 3025 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 3026 addListener( 3027 this.<GetProcedureResultResponse> newMasterCaller() 3028 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 3029 GetProcedureResultResponse> call(controller, stub, 3030 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 3031 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 3032 .call(), 3033 (response, error) -> { 3034 if (error != null) { 3035 LOG.warn("failed to get the procedure result procId={}", procId, 3036 ConnectionUtils.translateException(error)); 3037 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 3038 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3039 return; 3040 } 3041 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 3042 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 3043 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3044 return; 3045 } 3046 if (response.hasException()) { 3047 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 3048 future.completeExceptionally(ioe); 3049 } else { 3050 future.complete(null); 3051 } 3052 }); 3053 } 3054 3055 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3056 CompletableFuture<T> future = new CompletableFuture<>(); 3057 future.completeExceptionally(error); 3058 return future; 3059 } 3060 3061 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3062 if (error != null) { 3063 future.completeExceptionally(error); 3064 return true; 3065 } 3066 return false; 3067 } 3068 3069 @Override 3070 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3071 return getClusterMetrics(EnumSet.allOf(Option.class)); 3072 } 3073 3074 @Override 3075 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3076 return this.<ClusterMetrics> newMasterCaller() 3077 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3078 ClusterMetrics> call(controller, stub, 3079 RequestConverter.buildGetClusterStatusRequest(options), 3080 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3081 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3082 .call(); 3083 } 3084 3085 @Override 3086 public CompletableFuture<Void> shutdown() { 3087 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3088 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3089 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3090 resp -> null)) 3091 .call(); 3092 } 3093 3094 @Override 3095 public CompletableFuture<Void> stopMaster() { 3096 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3097 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3098 controller, stub, StopMasterRequest.newBuilder().build(), 3099 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3100 .call(); 3101 } 3102 3103 @Override 3104 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3105 StopServerRequest request = RequestConverter 3106 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3107 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3108 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3109 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3110 resp -> null)) 3111 .serverName(serverName).call(); 3112 } 3113 3114 @Override 3115 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3116 return this.<Void> newAdminCaller() 3117 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3118 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3119 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3120 .serverName(serverName).call(); 3121 } 3122 3123 @Override 3124 public CompletableFuture<Void> updateConfiguration() { 3125 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3126 addListener( 3127 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3128 (status, err) -> { 3129 if (err != null) { 3130 future.completeExceptionally(err); 3131 } else { 3132 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3133 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3134 futures.add(updateConfiguration(status.getMasterName())); 3135 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3136 addListener( 3137 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3138 (result, err2) -> { 3139 if (err2 != null) { 3140 future.completeExceptionally(err2); 3141 } else { 3142 future.complete(result); 3143 } 3144 }); 3145 } 3146 }); 3147 return future; 3148 } 3149 3150 @Override 3151 public CompletableFuture<Void> updateConfiguration(String groupName) { 3152 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3153 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3154 if (err != null) { 3155 future.completeExceptionally(err); 3156 } else if (rsGroupInfo == null) { 3157 future.completeExceptionally( 3158 new IllegalArgumentException("Group does not exist: " + groupName)); 3159 } else { 3160 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3161 if (err2 != null) { 3162 future.completeExceptionally(err2); 3163 } else { 3164 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3165 List<ServerName> groupServers = status.getServersName().stream() 3166 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3167 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3168 addListener( 3169 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3170 (result, err3) -> { 3171 if (err3 != null) { 3172 future.completeExceptionally(err3); 3173 } else { 3174 future.complete(result); 3175 } 3176 }); 3177 } 3178 }); 3179 } 3180 }); 3181 return future; 3182 } 3183 3184 @Override 3185 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3186 return this.<Void> newAdminCaller() 3187 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3188 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3189 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3190 .serverName(serverName).call(); 3191 } 3192 3193 @Override 3194 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3195 return this.<Void> newAdminCaller() 3196 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3197 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3198 RequestConverter.buildClearCompactionQueuesRequest(queues), 3199 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3200 .serverName(serverName).call(); 3201 } 3202 3203 @Override 3204 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3205 return this.<List<SecurityCapability>> newMasterCaller() 3206 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3207 List<SecurityCapability>> call(controller, stub, 3208 SecurityCapabilitiesRequest.newBuilder().build(), 3209 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3210 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3211 .call(); 3212 } 3213 3214 @Override 3215 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3216 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3217 } 3218 3219 @Override 3220 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3221 TableName tableName) { 3222 Preconditions.checkNotNull(tableName, 3223 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3224 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3225 } 3226 3227 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3228 ServerName serverName) { 3229 return this.<List<RegionMetrics>> newAdminCaller() 3230 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3231 List<RegionMetrics>> adminCall(controller, stub, request, 3232 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3233 RegionMetricsBuilder::toRegionMetrics)) 3234 .serverName(serverName).call(); 3235 } 3236 3237 @Override 3238 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3239 return this.<Boolean> newMasterCaller() 3240 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3241 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3242 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3243 resp -> resp.getInMaintenanceMode())) 3244 .call(); 3245 } 3246 3247 @Override 3248 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3249 CompactType compactType) { 3250 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3251 3252 switch (compactType) { 3253 case MOB: 3254 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3255 if (err != null) { 3256 future.completeExceptionally(err); 3257 return; 3258 } 3259 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3260 3261 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3262 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3263 GetRegionInfoResponse> adminCall(controller, stub, 3264 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3265 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3266 .call(), (resp2, err2) -> { 3267 if (err2 != null) { 3268 future.completeExceptionally(err2); 3269 } else { 3270 if (resp2.hasCompactionState()) { 3271 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3272 } else { 3273 future.complete(CompactionState.NONE); 3274 } 3275 } 3276 }); 3277 }); 3278 break; 3279 case NORMAL: 3280 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3281 if (err != null) { 3282 future.completeExceptionally(err); 3283 return; 3284 } 3285 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3286 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3287 locations.stream().filter(loc -> loc.getServerName() != null) 3288 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3289 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3290 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3291 // If any region compaction state is MAJOR_AND_MINOR 3292 // the table compaction state is MAJOR_AND_MINOR, too. 3293 if (err2 != null) { 3294 future.completeExceptionally(unwrapCompletionException(err2)); 3295 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3296 future.complete(regionState); 3297 } else { 3298 regionStates.add(regionState); 3299 } 3300 })); 3301 }); 3302 addListener( 3303 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3304 (ret, err3) -> { 3305 // If future not completed, check all regions's compaction state 3306 if (!future.isCompletedExceptionally() && !future.isDone()) { 3307 CompactionState state = CompactionState.NONE; 3308 for (CompactionState regionState : regionStates) { 3309 switch (regionState) { 3310 case MAJOR: 3311 if (state == CompactionState.MINOR) { 3312 future.complete(CompactionState.MAJOR_AND_MINOR); 3313 } else { 3314 state = CompactionState.MAJOR; 3315 } 3316 break; 3317 case MINOR: 3318 if (state == CompactionState.MAJOR) { 3319 future.complete(CompactionState.MAJOR_AND_MINOR); 3320 } else { 3321 state = CompactionState.MINOR; 3322 } 3323 break; 3324 case NONE: 3325 default: 3326 } 3327 } 3328 if (!future.isDone()) { 3329 future.complete(state); 3330 } 3331 } 3332 }); 3333 }); 3334 break; 3335 default: 3336 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3337 } 3338 3339 return future; 3340 } 3341 3342 @Override 3343 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3344 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3345 addListener(getRegionLocation(regionName), (location, err) -> { 3346 if (err != null) { 3347 future.completeExceptionally(err); 3348 return; 3349 } 3350 ServerName serverName = location.getServerName(); 3351 if (serverName == null) { 3352 future 3353 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3354 return; 3355 } 3356 addListener( 3357 this.<GetRegionInfoResponse> newAdminCaller() 3358 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3359 GetRegionInfoResponse> adminCall(controller, stub, 3360 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3361 true), 3362 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3363 .serverName(serverName).call(), 3364 (resp2, err2) -> { 3365 if (err2 != null) { 3366 future.completeExceptionally(err2); 3367 } else { 3368 if (resp2.hasCompactionState()) { 3369 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3370 } else { 3371 future.complete(CompactionState.NONE); 3372 } 3373 } 3374 }); 3375 }); 3376 return future; 3377 } 3378 3379 @Override 3380 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3381 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3382 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3383 return this.<Optional<Long>> newMasterCaller() 3384 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3385 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3386 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3387 ProtobufUtil::toOptionalTimestamp)) 3388 .call(); 3389 } 3390 3391 @Override 3392 public CompletableFuture<Optional<Long>> 3393 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3394 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3395 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3396 addListener(getRegionInfo(regionName), (region, err) -> { 3397 if (err != null) { 3398 future.completeExceptionally(err); 3399 return; 3400 } 3401 MajorCompactionTimestampForRegionRequest.Builder builder = 3402 MajorCompactionTimestampForRegionRequest.newBuilder(); 3403 builder.setRegion( 3404 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3405 addListener(this.<Optional<Long>> newMasterCaller() 3406 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3407 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3408 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3409 ProtobufUtil::toOptionalTimestamp)) 3410 .call(), (timestamp, err2) -> { 3411 if (err2 != null) { 3412 future.completeExceptionally(err2); 3413 } else { 3414 future.complete(timestamp); 3415 } 3416 }); 3417 }); 3418 return future; 3419 } 3420 3421 @Override 3422 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3423 List<String> serverNamesList) { 3424 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3425 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3426 if (err != null) { 3427 future.completeExceptionally(err); 3428 return; 3429 } 3430 // Accessed by multiple threads. 3431 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3432 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3433 serverNames.stream().forEach(serverName -> { 3434 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3435 if (err2 != null) { 3436 future.completeExceptionally(unwrapCompletionException(err2)); 3437 } else { 3438 serverStates.put(serverName, serverState); 3439 } 3440 })); 3441 }); 3442 addListener( 3443 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3444 (ret, err3) -> { 3445 if (!future.isCompletedExceptionally()) { 3446 if (err3 != null) { 3447 future.completeExceptionally(err3); 3448 } else { 3449 future.complete(serverStates); 3450 } 3451 } 3452 }); 3453 }); 3454 return future; 3455 } 3456 3457 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3458 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3459 if (serverNamesList.isEmpty()) { 3460 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3461 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3462 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3463 if (err != null) { 3464 future.completeExceptionally(err); 3465 } else { 3466 future.complete(clusterMetrics.getServersName()); 3467 } 3468 }); 3469 return future; 3470 } else { 3471 List<ServerName> serverList = new ArrayList<>(); 3472 for (String regionServerName : serverNamesList) { 3473 ServerName serverName = null; 3474 try { 3475 serverName = ServerName.valueOf(regionServerName); 3476 } catch (Exception e) { 3477 future.completeExceptionally( 3478 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3479 } 3480 if (serverName == null) { 3481 future.completeExceptionally( 3482 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3483 } else { 3484 serverList.add(serverName); 3485 } 3486 } 3487 future.complete(serverList); 3488 } 3489 return future; 3490 } 3491 3492 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3493 return this.<Boolean> newAdminCaller().serverName(serverName) 3494 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3495 Boolean> adminCall(controller, stub, 3496 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3497 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3498 .call(); 3499 } 3500 3501 @Override 3502 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3503 return this.<Boolean> newMasterCaller() 3504 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3505 Boolean> call(controller, stub, 3506 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3507 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3508 (resp) -> resp.getPrevBalanceValue())) 3509 .call(); 3510 } 3511 3512 @Override 3513 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3514 return this.<BalanceResponse> newMasterCaller() 3515 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3516 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3517 (s, c, req, done) -> s.balance(c, req, done), 3518 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3519 .call(); 3520 } 3521 3522 @Override 3523 public CompletableFuture<Boolean> isBalancerEnabled() { 3524 return this.<Boolean> newMasterCaller() 3525 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3526 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3527 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3528 .call(); 3529 } 3530 3531 @Override 3532 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3533 return this.<Boolean> newMasterCaller() 3534 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3535 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3536 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3537 (resp) -> resp.getPrevNormalizerValue())) 3538 .call(); 3539 } 3540 3541 @Override 3542 public CompletableFuture<Boolean> isNormalizerEnabled() { 3543 return this.<Boolean> newMasterCaller() 3544 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3545 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3546 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3547 .call(); 3548 } 3549 3550 @Override 3551 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3552 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3553 } 3554 3555 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3556 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3557 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3558 } 3559 3560 @Override 3561 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3562 return this.<Boolean> newMasterCaller() 3563 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3564 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3565 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3566 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3567 (resp) -> resp.getPrevValue())) 3568 .call(); 3569 } 3570 3571 @Override 3572 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3573 return this.<Boolean> newMasterCaller() 3574 .action( 3575 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3576 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3577 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3578 .call(); 3579 } 3580 3581 @Override 3582 public CompletableFuture<Boolean> runCleanerChore() { 3583 return this.<Boolean> newMasterCaller() 3584 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3585 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3586 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3587 (resp) -> resp.getCleanerChoreRan())) 3588 .call(); 3589 } 3590 3591 @Override 3592 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3593 return this.<Boolean> newMasterCaller() 3594 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3595 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3596 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3597 .call(); 3598 } 3599 3600 @Override 3601 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3602 return this.<Boolean> newMasterCaller() 3603 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3604 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3605 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3606 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3607 .call(); 3608 } 3609 3610 @Override 3611 public CompletableFuture<Integer> runCatalogJanitor() { 3612 return this.<Integer> newMasterCaller() 3613 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3614 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3615 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3616 .call(); 3617 } 3618 3619 @Override 3620 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3621 ServiceCaller<S, R> callable) { 3622 MasterCoprocessorRpcChannelImpl channel = 3623 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3624 S stub = stubMaker.apply(channel); 3625 CompletableFuture<R> future = new CompletableFuture<>(); 3626 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3627 callable.call(stub, controller, resp -> { 3628 if (controller.failed()) { 3629 future.completeExceptionally(controller.getFailed()); 3630 } else { 3631 future.complete(resp); 3632 } 3633 }); 3634 return future; 3635 } 3636 3637 @Override 3638 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3639 ServiceCaller<S, R> callable, ServerName serverName) { 3640 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3641 this.<Message> newServerCaller().serverName(serverName)); 3642 S stub = stubMaker.apply(channel); 3643 CompletableFuture<R> future = new CompletableFuture<>(); 3644 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3645 callable.call(stub, controller, resp -> { 3646 if (controller.failed()) { 3647 future.completeExceptionally(controller.getFailed()); 3648 } else { 3649 future.complete(resp); 3650 } 3651 }); 3652 return future; 3653 } 3654 3655 @Override 3656 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3657 return this.<List<ServerName>> newMasterCaller() 3658 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3659 List<ServerName>> call(controller, stub, 3660 RequestConverter.buildClearDeadServersRequest(servers), 3661 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3662 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3663 .call(); 3664 } 3665 3666 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3667 return this.connection.callerFactory.<T> serverRequest() 3668 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3669 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3670 .pause(pauseNs, TimeUnit.NANOSECONDS) 3671 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3672 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3673 } 3674 3675 @Override 3676 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3677 if (tableName == null) { 3678 return failedFuture(new IllegalArgumentException("Table name is null")); 3679 } 3680 CompletableFuture<Void> future = new CompletableFuture<>(); 3681 addListener(tableExists(tableName), (exist, err) -> { 3682 if (err != null) { 3683 future.completeExceptionally(err); 3684 return; 3685 } 3686 if (!exist) { 3687 future.completeExceptionally(new TableNotFoundException( 3688 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3689 return; 3690 } 3691 addListener(getTableSplits(tableName), (splits, err1) -> { 3692 if (err1 != null) { 3693 future.completeExceptionally(err1); 3694 } else { 3695 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3696 if (err2 != null) { 3697 future.completeExceptionally(err2); 3698 } else { 3699 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3700 if (err3 != null) { 3701 future.completeExceptionally(err3); 3702 } else { 3703 future.complete(result3); 3704 } 3705 }); 3706 } 3707 }); 3708 } 3709 }); 3710 }); 3711 return future; 3712 } 3713 3714 @Override 3715 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3716 if (tableName == null) { 3717 return failedFuture(new IllegalArgumentException("Table name is null")); 3718 } 3719 CompletableFuture<Void> future = new CompletableFuture<>(); 3720 addListener(tableExists(tableName), (exist, err) -> { 3721 if (err != null) { 3722 future.completeExceptionally(err); 3723 return; 3724 } 3725 if (!exist) { 3726 future.completeExceptionally(new TableNotFoundException( 3727 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3728 return; 3729 } 3730 addListener(setTableReplication(tableName, false), (result, err2) -> { 3731 if (err2 != null) { 3732 future.completeExceptionally(err2); 3733 } else { 3734 future.complete(result); 3735 } 3736 }); 3737 }); 3738 return future; 3739 } 3740 3741 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3742 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3743 addListener( 3744 getRegions(tableName).thenApply(regions -> regions.stream() 3745 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3746 (regions, err2) -> { 3747 if (err2 != null) { 3748 future.completeExceptionally(err2); 3749 return; 3750 } 3751 if (regions.size() == 1) { 3752 future.complete(null); 3753 } else { 3754 byte[][] splits = new byte[regions.size() - 1][]; 3755 for (int i = 1; i < regions.size(); i++) { 3756 splits[i - 1] = regions.get(i).getStartKey(); 3757 } 3758 future.complete(splits); 3759 } 3760 }); 3761 return future; 3762 } 3763 3764 /** 3765 * Connect to peer and check the table descriptor on peer: 3766 * <ol> 3767 * <li>Create the same table on peer when not exist.</li> 3768 * <li>Throw an exception if the table already has replication enabled on any of the column 3769 * families.</li> 3770 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3771 * </ol> 3772 * @param tableName name of the table to sync to the peer 3773 * @param splits table split keys 3774 */ 3775 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3776 byte[][] splits) { 3777 CompletableFuture<Void> future = new CompletableFuture<>(); 3778 addListener(listReplicationPeers(), (peers, err) -> { 3779 if (err != null) { 3780 future.completeExceptionally(err); 3781 return; 3782 } 3783 if (peers == null || peers.size() <= 0) { 3784 future.completeExceptionally( 3785 new IllegalArgumentException("Found no peer cluster for replication.")); 3786 return; 3787 } 3788 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3789 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3790 .forEach(peer -> { 3791 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3792 }); 3793 addListener( 3794 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3795 (result, err2) -> { 3796 if (err2 != null) { 3797 future.completeExceptionally(err2); 3798 } else { 3799 future.complete(result); 3800 } 3801 }); 3802 }); 3803 return future; 3804 } 3805 3806 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3807 ReplicationPeerDescription peer) { 3808 Configuration peerConf; 3809 try { 3810 peerConf = ReplicationPeerConfigUtil 3811 .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig()); 3812 } catch (IOException e) { 3813 return failedFuture(e); 3814 } 3815 URI connectionUri = 3816 ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey()); 3817 CompletableFuture<Void> future = new CompletableFuture<>(); 3818 addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> { 3819 if (err != null) { 3820 future.completeExceptionally(err); 3821 return; 3822 } 3823 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3824 if (err1 != null) { 3825 future.completeExceptionally(err1); 3826 return; 3827 } 3828 AsyncAdmin peerAdmin = conn.getAdmin(); 3829 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3830 if (err2 != null) { 3831 future.completeExceptionally(err2); 3832 return; 3833 } 3834 if (!exist) { 3835 CompletableFuture<Void> createTableFuture = null; 3836 if (splits == null) { 3837 createTableFuture = peerAdmin.createTable(tableDesc); 3838 } else { 3839 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3840 } 3841 addListener(createTableFuture, (result, err3) -> { 3842 if (err3 != null) { 3843 future.completeExceptionally(err3); 3844 } else { 3845 future.complete(result); 3846 } 3847 }); 3848 } else { 3849 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3850 (result, err4) -> { 3851 if (err4 != null) { 3852 future.completeExceptionally(err4); 3853 } else { 3854 future.complete(result); 3855 } 3856 }); 3857 } 3858 }); 3859 }); 3860 }); 3861 return future; 3862 } 3863 3864 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3865 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3866 CompletableFuture<Void> future = new CompletableFuture<>(); 3867 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3868 if (err != null) { 3869 future.completeExceptionally(err); 3870 return; 3871 } 3872 if (peerTableDesc == null) { 3873 future.completeExceptionally( 3874 new IllegalArgumentException("Failed to get table descriptor for table " 3875 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3876 return; 3877 } 3878 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3879 future.completeExceptionally(new IllegalArgumentException( 3880 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3881 + ", but the table descriptors are not same when compared with source cluster." 3882 + " Thus can not enable the table's replication switch.")); 3883 return; 3884 } 3885 future.complete(null); 3886 }); 3887 return future; 3888 } 3889 3890 /** 3891 * Set the table's replication switch if the table's replication switch is already not set. 3892 * @param tableName name of the table 3893 * @param enableRep is replication switch enable or disable 3894 */ 3895 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3896 CompletableFuture<Void> future = new CompletableFuture<>(); 3897 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3898 if (err != null) { 3899 future.completeExceptionally(err); 3900 return; 3901 } 3902 if (!tableDesc.matchReplicationScope(enableRep)) { 3903 int scope = 3904 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3905 TableDescriptor newTableDesc = 3906 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3907 addListener(modifyTable(newTableDesc), (result, err2) -> { 3908 if (err2 != null) { 3909 future.completeExceptionally(err2); 3910 } else { 3911 future.complete(result); 3912 } 3913 }); 3914 } else { 3915 future.complete(null); 3916 } 3917 }); 3918 return future; 3919 } 3920 3921 @Override 3922 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 3923 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 3924 request.setPeerId(peerId); 3925 return this.<Boolean> newMasterCaller() 3926 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 3927 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 3928 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 3929 resp -> resp.getIsEnabled())) 3930 .call(); 3931 } 3932 3933 private void waitUntilAllReplicationPeerModificationProceduresDone( 3934 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 3935 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 3936 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 3937 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 3938 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 3939 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 3940 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 3941 resp -> resp.getProcedureList())) 3942 .call(); 3943 addListener(callFuture, (r, e) -> { 3944 if (e != null) { 3945 future.completeExceptionally(e); 3946 } else if (r.isEmpty()) { 3947 // we are done 3948 future.complete(prevOn); 3949 } else { 3950 // retry later to see if the procedures are done 3951 retryTimer.newTimeout( 3952 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 3953 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3954 } 3955 }); 3956 } 3957 3958 @Override 3959 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 3960 boolean drainProcedures) { 3961 ReplicationPeerModificationSwitchRequest request = 3962 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 3963 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 3964 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 3965 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 3966 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 3967 resp -> resp.getPreviousValue())) 3968 .call(); 3969 // if we do not need to wait all previous peer modification procedure done, or we are enabling 3970 // peer modification, just return here. 3971 if (!drainProcedures || on) { 3972 return callFuture; 3973 } 3974 // otherwise we need to wait until all previous peer modification procedure done 3975 CompletableFuture<Boolean> future = new CompletableFuture<>(); 3976 addListener(callFuture, (prevOn, err) -> { 3977 if (err != null) { 3978 future.completeExceptionally(err); 3979 return; 3980 } 3981 // even if the previous state is disabled, we still need to wait here, as there could be 3982 // another client thread which called this method just before us and have already changed the 3983 // state to off, but there are still peer modification procedures not finished, so we should 3984 // also wait here. 3985 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 3986 }); 3987 return future; 3988 } 3989 3990 @Override 3991 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 3992 return this.<Boolean> newMasterCaller() 3993 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 3994 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 3995 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 3996 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 3997 (resp) -> resp.getEnabled())) 3998 .call(); 3999 } 4000 4001 @Override 4002 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 4003 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 4004 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 4005 if (err != null) { 4006 future.completeExceptionally(err); 4007 return; 4008 } 4009 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 4010 locations.stream().filter(l -> l.getRegion() != null) 4011 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 4012 .collect(Collectors.groupingBy(l -> l.getServerName(), 4013 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 4014 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 4015 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 4016 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 4017 futures 4018 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 4019 if (err2 != null) { 4020 future.completeExceptionally(unwrapCompletionException(err2)); 4021 } else { 4022 aggregator.append(stats); 4023 } 4024 })); 4025 } 4026 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 4027 (ret, err3) -> { 4028 if (err3 != null) { 4029 future.completeExceptionally(unwrapCompletionException(err3)); 4030 } else { 4031 future.complete(aggregator.sum()); 4032 } 4033 }); 4034 }); 4035 return future; 4036 } 4037 4038 @Override 4039 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 4040 boolean preserveSplits) { 4041 CompletableFuture<Void> future = new CompletableFuture<>(); 4042 addListener(tableExists(tableName), (exist, err) -> { 4043 if (err != null) { 4044 future.completeExceptionally(err); 4045 return; 4046 } 4047 if (!exist) { 4048 future.completeExceptionally(new TableNotFoundException(tableName)); 4049 return; 4050 } 4051 addListener(tableExists(newTableName), (exist1, err1) -> { 4052 if (err1 != null) { 4053 future.completeExceptionally(err1); 4054 return; 4055 } 4056 if (exist1) { 4057 future.completeExceptionally(new TableExistsException(newTableName)); 4058 return; 4059 } 4060 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 4061 if (err2 != null) { 4062 future.completeExceptionally(err2); 4063 return; 4064 } 4065 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 4066 if (preserveSplits) { 4067 addListener(getTableSplits(tableName), (splits, err3) -> { 4068 if (err3 != null) { 4069 future.completeExceptionally(err3); 4070 } else { 4071 addListener( 4072 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 4073 (result, err4) -> { 4074 if (err4 != null) { 4075 future.completeExceptionally(err4); 4076 } else { 4077 future.complete(result); 4078 } 4079 }); 4080 } 4081 }); 4082 } else { 4083 addListener(createTable(newTableDesc), (result, err5) -> { 4084 if (err5 != null) { 4085 future.completeExceptionally(err5); 4086 } else { 4087 future.complete(result); 4088 } 4089 }); 4090 } 4091 }); 4092 }); 4093 }); 4094 return future; 4095 } 4096 4097 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4098 List<RegionInfo> hris) { 4099 return this.<CacheEvictionStats> newAdminCaller() 4100 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4101 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4102 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4103 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4104 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4105 .serverName(serverName).call(); 4106 } 4107 4108 @Override 4109 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4110 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4111 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4112 Boolean> call(controller, stub, 4113 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4114 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4115 resp -> resp.getPreviousRpcThrottleEnabled())) 4116 .call(); 4117 return future; 4118 } 4119 4120 @Override 4121 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4122 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4123 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4124 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4125 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4126 resp -> resp.getRpcThrottleEnabled())) 4127 .call(); 4128 return future; 4129 } 4130 4131 @Override 4132 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4133 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4134 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4135 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4136 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4137 .build(), 4138 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4139 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4140 .call(); 4141 return future; 4142 } 4143 4144 @Override 4145 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4146 return this.<Map<TableName, Long>> newMasterCaller() 4147 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4148 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4149 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4150 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4151 resp -> resp.getSizesList().stream().collect(Collectors 4152 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4153 .call(); 4154 } 4155 4156 @Override 4157 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4158 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4159 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4160 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4161 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4162 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4163 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4164 resp -> resp.getSnapshotsList().stream() 4165 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4166 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4167 .serverName(serverName).call(); 4168 } 4169 4170 private CompletableFuture<SpaceQuotaSnapshot> 4171 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4172 return this.<SpaceQuotaSnapshot> newMasterCaller() 4173 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4174 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4175 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4176 .call(); 4177 } 4178 4179 @Override 4180 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4181 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4182 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4183 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4184 } 4185 4186 @Override 4187 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4188 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4189 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4190 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4191 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4192 } 4193 4194 @Override 4195 public CompletableFuture<Void> grant(UserPermission userPermission, 4196 boolean mergeExistingPermissions) { 4197 return this.<Void> newMasterCaller() 4198 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4199 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4200 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4201 .call(); 4202 } 4203 4204 @Override 4205 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4206 return this.<Void> newMasterCaller() 4207 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4208 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4209 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4210 .call(); 4211 } 4212 4213 @Override 4214 public CompletableFuture<List<UserPermission>> 4215 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4216 return this.<List<UserPermission>> newMasterCaller() 4217 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4218 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4219 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4220 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4221 resp -> resp.getUserPermissionList().stream() 4222 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4223 .collect(Collectors.toList()))) 4224 .call(); 4225 } 4226 4227 @Override 4228 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4229 List<Permission> permissions) { 4230 return this.<List<Boolean>> newMasterCaller() 4231 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4232 List<Boolean>> call(controller, stub, 4233 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4234 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4235 resp -> resp.getHasUserPermissionList())) 4236 .call(); 4237 } 4238 4239 @Override 4240 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4241 return this.<Boolean> newMasterCaller() 4242 .action((controller, stub) -> this.call(controller, stub, 4243 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4244 MasterService.Interface::switchSnapshotCleanup, 4245 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4246 .call(); 4247 } 4248 4249 @Override 4250 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4251 return this.<Boolean> newMasterCaller() 4252 .action((controller, stub) -> this.call(controller, stub, 4253 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4254 MasterService.Interface::isSnapshotCleanupEnabled, 4255 IsSnapshotCleanupEnabledResponse::getEnabled)) 4256 .call(); 4257 } 4258 4259 @Override 4260 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4261 return this.<Void> newMasterCaller() 4262 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4263 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4264 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4265 .call(); 4266 } 4267 4268 @Override 4269 public CompletableFuture<Void> addRSGroup(String groupName) { 4270 return this.<Void> newMasterCaller() 4271 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4272 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4273 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4274 .call(); 4275 } 4276 4277 @Override 4278 public CompletableFuture<Void> removeRSGroup(String groupName) { 4279 return this.<Void> newMasterCaller() 4280 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4281 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4282 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4283 .call(); 4284 } 4285 4286 @Override 4287 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4288 BalanceRequest request) { 4289 return this.<BalanceResponse> newMasterCaller() 4290 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4291 BalanceResponse> call(controller, stub, 4292 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4293 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4294 .call(); 4295 } 4296 4297 @Override 4298 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4299 return this.<List<RSGroupInfo>> newMasterCaller() 4300 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4301 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4302 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4303 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4304 .call(); 4305 } 4306 4307 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4308 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4309 final String logType) { 4310 if (CollectionUtils.isEmpty(serverNames)) { 4311 return CompletableFuture.completedFuture(Collections.emptyList()); 4312 } 4313 return CompletableFuture.supplyAsync(() -> serverNames 4314 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4315 filterParams, limit, logType)) 4316 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4317 } 4318 4319 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4320 Map<String, Object> filterParams, int limit, String logType) { 4321 return this.<List<LogEntry>> newAdminCaller() 4322 .action((controller, stub) -> this.adminCall(controller, stub, 4323 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4324 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4325 .serverName(serverName).call(); 4326 } 4327 4328 @Override 4329 public CompletableFuture<List<Boolean>> 4330 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4331 if (CollectionUtils.isEmpty(serverNames)) { 4332 return CompletableFuture.completedFuture(Collections.emptyList()); 4333 } 4334 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4335 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4336 return convertToFutureOfList(clearSlowLogResponseList); 4337 } 4338 4339 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4340 return this.<Boolean> newAdminCaller() 4341 .action((controller, stub) -> this.adminCall(controller, stub, 4342 RequestConverter.buildClearSlowLogResponseRequest(), 4343 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4344 .serverName(serverName).call(); 4345 } 4346 4347 private static <T> CompletableFuture<List<T>> 4348 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4349 CompletableFuture<Void> allDoneFuture = 4350 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4351 return allDoneFuture 4352 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4353 } 4354 4355 @Override 4356 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4357 return this.<List<TableName>> newMasterCaller() 4358 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4359 List<TableName>> call(controller, stub, 4360 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4361 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4362 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4363 .call(); 4364 } 4365 4366 @Override 4367 public CompletableFuture<Pair<List<String>, List<TableName>>> 4368 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4369 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4370 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4371 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4372 Pair<List<String>, List<TableName>>> call(controller, stub, 4373 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4374 .build(), 4375 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4376 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4377 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4378 .call(); 4379 } 4380 4381 @Override 4382 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4383 return this.<RSGroupInfo> newMasterCaller() 4384 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4385 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4386 GetRSGroupInfoOfServerRequest.newBuilder() 4387 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4388 .setPort(hostPort.getPort()).build()) 4389 .build(), 4390 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4391 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4392 .call(); 4393 } 4394 4395 @Override 4396 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4397 return this.<Void> newMasterCaller() 4398 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4399 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4400 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4401 .call(); 4402 } 4403 4404 @Override 4405 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4406 CompletableFuture<Void> future = new CompletableFuture<>(); 4407 for (TableName tableName : tables) { 4408 addListener(tableExists(tableName), (exist, err) -> { 4409 if (err != null) { 4410 future.completeExceptionally(err); 4411 return; 4412 } 4413 if (!exist) { 4414 future.completeExceptionally(new TableNotFoundException(tableName)); 4415 return; 4416 } 4417 }); 4418 } 4419 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4420 if (err != null) { 4421 future.completeExceptionally(err); 4422 return; 4423 } 4424 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4425 future.complete(null); 4426 return; 4427 } 4428 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4429 for (TableDescriptor td : tableDescriptions) { 4430 newTableDescriptors 4431 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4432 } 4433 addListener( 4434 CompletableFuture.allOf( 4435 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4436 (v, e) -> { 4437 if (e != null) { 4438 future.completeExceptionally(e); 4439 } else { 4440 future.complete(v); 4441 } 4442 }); 4443 }); 4444 return future; 4445 } 4446 4447 @Override 4448 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4449 return this.<RSGroupInfo> newMasterCaller() 4450 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4451 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4452 GetRSGroupInfoOfTableRequest.newBuilder() 4453 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4454 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4455 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4456 .call(); 4457 } 4458 4459 @Override 4460 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4461 return this.<RSGroupInfo> newMasterCaller() 4462 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4463 RSGroupInfo> call(controller, stub, 4464 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4465 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4466 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4467 .call(); 4468 } 4469 4470 @Override 4471 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4472 return this.<Void> newMasterCaller() 4473 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4474 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4475 .setNewRsgroupName(newName).build(), 4476 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4477 .call(); 4478 } 4479 4480 @Override 4481 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4482 Map<String, String> configuration) { 4483 UpdateRSGroupConfigRequest.Builder request = 4484 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4485 if (configuration != null) { 4486 configuration.entrySet().forEach(e -> request.addConfiguration( 4487 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4488 } 4489 return this.<Void> newMasterCaller() 4490 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4491 Void> call(controller, stub, request.build(), 4492 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4493 .call(); 4494 } 4495 4496 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4497 return this.<List<LogEntry>> newMasterCaller() 4498 .action((controller, stub) -> this.call(controller, stub, 4499 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4500 ProtobufUtil::toBalancerDecisionResponse)) 4501 .call(); 4502 } 4503 4504 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4505 return this.<List<LogEntry>> newMasterCaller() 4506 .action((controller, stub) -> this.call(controller, stub, 4507 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4508 ProtobufUtil::toBalancerRejectionResponse)) 4509 .call(); 4510 } 4511 4512 @Override 4513 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4514 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4515 if (logType == null || serverType == null) { 4516 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4517 } 4518 switch (logType) { 4519 case "SLOW_LOG": 4520 case "LARGE_LOG": 4521 if (ServerType.MASTER.equals(serverType)) { 4522 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4523 } 4524 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4525 case "BALANCER_DECISION": 4526 if (ServerType.REGION_SERVER.equals(serverType)) { 4527 throw new IllegalArgumentException( 4528 "Balancer Decision logs are not maintained by HRegionServer"); 4529 } 4530 return getBalancerDecisions(limit); 4531 case "BALANCER_REJECTION": 4532 if (ServerType.REGION_SERVER.equals(serverType)) { 4533 throw new IllegalArgumentException( 4534 "Balancer Rejection logs are not maintained by HRegionServer"); 4535 } 4536 return getBalancerRejections(limit); 4537 default: 4538 return CompletableFuture.completedFuture(Collections.emptyList()); 4539 } 4540 } 4541 4542 @Override 4543 public CompletableFuture<Void> flushMasterStore() { 4544 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4545 return this.<Void> newMasterCaller() 4546 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4547 Void> call(controller, stub, request.build(), 4548 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4549 .call(); 4550 } 4551 4552 @Override 4553 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4554 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4555 return this.<List<String>> newAdminCaller() 4556 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4557 List<String>> adminCall(controller, stub, request.build(), 4558 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4559 resp -> resp.getCachedFilesList())) 4560 .serverName(serverName).call(); 4561 } 4562}