001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import edu.umd.cs.findbugs.annotations.Nullable; 026import java.io.IOException; 027import java.net.URI; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Set; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.function.BiConsumer; 043import java.util.function.Consumer; 044import java.util.function.Function; 045import java.util.function.Supplier; 046import java.util.regex.Pattern; 047import java.util.stream.Collectors; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 052import org.apache.hadoop.hbase.CatalogFamilyFormat; 053import org.apache.hadoop.hbase.ClientMetaTableAccessor; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.HRegionLocation; 060import org.apache.hadoop.hbase.NamespaceDescriptor; 061import org.apache.hadoop.hbase.RegionLocations; 062import org.apache.hadoop.hbase.RegionMetrics; 063import org.apache.hadoop.hbase.RegionMetricsBuilder; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableExistsException; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotDisabledException; 068import org.apache.hadoop.hbase.TableNotEnabledException; 069import org.apache.hadoop.hbase.TableNotFoundException; 070import org.apache.hadoop.hbase.UnknownRegionException; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.Scan.ReadType; 075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 076import org.apache.hadoop.hbase.client.replication.TableCFs; 077import org.apache.hadoop.hbase.client.security.SecurityCapability; 078import org.apache.hadoop.hbase.exceptions.DeserializationException; 079import org.apache.hadoop.hbase.ipc.HBaseRpcController; 080import org.apache.hadoop.hbase.net.Address; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 086import org.apache.hadoop.hbase.replication.ReplicationException; 087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 089import org.apache.hadoop.hbase.replication.SyncReplicationState; 090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 092import org.apache.hadoop.hbase.security.access.Permission; 093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 094import org.apache.hadoop.hbase.security.access.UserPermission; 095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.hbase.util.Strings; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 109import org.apache.hbase.thirdparty.com.google.protobuf.Message; 110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 113import org.apache.hbase.thirdparty.io.netty.util.Timeout; 114import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 116 117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 307import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 308import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 309import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest; 316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse; 317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest; 326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse; 327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest; 334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse; 335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest; 336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse; 337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest; 358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; 359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 361import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 362 363/** 364 * The implementation of AsyncAdmin. 365 * <p> 366 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 367 * be finished inside the rpc framework thread, which means that the callbacks registered to the 368 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 369 * this class should not try to do time consuming tasks in the callbacks. 370 * @since 2.0.0 371 * @see AsyncHBaseAdmin 372 * @see AsyncConnection#getAdmin() 373 * @see AsyncConnection#getAdminBuilder() 374 */ 375@InterfaceAudience.Private 376class RawAsyncHBaseAdmin implements AsyncAdmin { 377 378 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 379 380 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 381 382 private final AsyncConnectionImpl connection; 383 384 private final HashedWheelTimer retryTimer; 385 386 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 387 388 private final long rpcTimeoutNs; 389 390 private final long operationTimeoutNs; 391 392 private final long pauseNs; 393 394 private final long pauseNsForServerOverloaded; 395 396 private final int maxAttempts; 397 398 private final int startLogErrorsCnt; 399 400 private final NonceGenerator ng; 401 402 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 403 AsyncAdminBuilderBase builder) { 404 this.connection = connection; 405 this.retryTimer = retryTimer; 406 this.metaTable = connection.getTable(META_TABLE_NAME); 407 this.rpcTimeoutNs = builder.rpcTimeoutNs; 408 this.operationTimeoutNs = builder.operationTimeoutNs; 409 this.pauseNs = builder.pauseNs; 410 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 411 LOG.warn( 412 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 413 + " the normal pause value {} ms, use the greater one instead", 414 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 415 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 416 this.pauseNsForServerOverloaded = builder.pauseNs; 417 } else { 418 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 419 } 420 this.maxAttempts = builder.maxAttempts; 421 this.startLogErrorsCnt = builder.startLogErrorsCnt; 422 this.ng = connection.getNonceGenerator(); 423 } 424 425 <T> MasterRequestCallerBuilder<T> newMasterCaller() { 426 return this.connection.callerFactory.<T> masterRequest() 427 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 428 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 429 .pause(pauseNs, TimeUnit.NANOSECONDS) 430 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 431 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 432 } 433 434 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 435 return this.connection.callerFactory.<T> adminRequest() 436 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 437 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 438 .pause(pauseNs, TimeUnit.NANOSECONDS) 439 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 440 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 441 } 442 443 @FunctionalInterface 444 private interface MasterRpcCall<RESP, REQ> { 445 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 446 RpcCallback<RESP> done); 447 } 448 449 @FunctionalInterface 450 private interface AdminRpcCall<RESP, REQ> { 451 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 452 RpcCallback<RESP> done); 453 } 454 455 @FunctionalInterface 456 private interface Converter<D, S> { 457 D convert(S src) throws IOException; 458 } 459 460 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 461 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 462 Converter<RESP, PRESP> respConverter) { 463 CompletableFuture<RESP> future = new CompletableFuture<>(); 464 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 465 466 @Override 467 public void run(PRESP resp) { 468 if (controller.failed()) { 469 future.completeExceptionally(controller.getFailed()); 470 } else { 471 try { 472 future.complete(respConverter.convert(resp)); 473 } catch (IOException e) { 474 future.completeExceptionally(e); 475 } 476 } 477 } 478 }); 479 return future; 480 } 481 482 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 483 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 484 Converter<RESP, PRESP> respConverter) { 485 CompletableFuture<RESP> future = new CompletableFuture<>(); 486 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 487 488 @Override 489 public void run(PRESP resp) { 490 if (controller.failed()) { 491 future.completeExceptionally(controller.getFailed()); 492 } else { 493 try { 494 future.complete(respConverter.convert(resp)); 495 } catch (IOException e) { 496 future.completeExceptionally(e); 497 } 498 } 499 } 500 }); 501 return future; 502 } 503 504 /** 505 * short-circuit call for 506 * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)} 507 * by ignoring procedure result 508 */ 509 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 510 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 511 ProcedureBiConsumer<Void> consumer) { 512 return procedureCall(preq, rpcCall, respConverter, result -> null, consumer); 513 } 514 515 /** 516 * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, 517 * ProcedureBiConsumer) by skip setting priority for request 518 */ 519 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq, 520 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 521 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 522 return procedureCall(b -> { 523 }, preq, rpcCall, respConverter, resultConverter, consumer); 524 } 525 526 /** 527 * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter, 528 * ProcedureBiConsumer) by ignoring procedure result 529 */ 530 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 531 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 532 ProcedureBiConsumer<Void> consumer) { 533 return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer); 534 } 535 536 /** 537 * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, 538 * ProcedureBiConsumer) by skip setting priority for request 539 */ 540 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq, 541 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 542 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 543 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter, 544 consumer); 545 } 546 547 /** 548 * @param <PREQ> type of request 549 * @param <PRESP> type of response 550 * @param <PRES> type of procedure call result 551 * @param prioritySetter prioritySetter set priority by table for request 552 * @param preq procedure call request 553 * @param rpcCall procedure rpc call 554 * @param respConverter extract proc id from procedure call response 555 * @param resultConverter extract result from procedure call result 556 * @param consumer action performs on result 557 * @return procedure call result, null if procedure is void 558 */ 559 private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall( 560 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 561 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 562 Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { 563 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller() 564 .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter)); 565 prioritySetter.accept(builder); 566 CompletableFuture<Long> procFuture = builder.call(); 567 CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter); 568 addListener(future, consumer); 569 return future; 570 } 571 572 @Override 573 public CompletableFuture<Boolean> tableExists(TableName tableName) { 574 if (TableName.isMetaTableName(tableName)) { 575 return CompletableFuture.completedFuture(true); 576 } 577 return ClientMetaTableAccessor.tableExists(metaTable, tableName); 578 } 579 580 @Override 581 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 582 return getTableDescriptors( 583 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 584 } 585 586 /** 587 * {@link #listTableDescriptors(boolean)} 588 */ 589 @Override 590 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 591 boolean includeSysTables) { 592 Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, " 593 + "use listTableDescriptors(boolean) instead"); 594 return getTableDescriptors( 595 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 596 } 597 598 @Override 599 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 600 Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, " 601 + "use listTableDescriptors(boolean) instead"); 602 if (tableNames.isEmpty()) { 603 return CompletableFuture.completedFuture(Collections.emptyList()); 604 } 605 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 606 } 607 608 private CompletableFuture<List<TableDescriptor>> 609 getTableDescriptors(GetTableDescriptorsRequest request) { 610 return this.<List<TableDescriptor>> newMasterCaller() 611 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 612 List<TableDescriptor>> call(controller, stub, request, 613 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 614 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 615 .call(); 616 } 617 618 @Override 619 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 620 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 621 } 622 623 @Override 624 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 625 boolean includeSysTables) { 626 Preconditions.checkNotNull(pattern, 627 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 628 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 629 } 630 631 @Override 632 public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) { 633 return this.<List<TableName>> newMasterCaller() 634 .action((controller, stub) -> this.<ListTableNamesByStateRequest, 635 ListTableNamesByStateResponse, List<TableName>> call(controller, stub, 636 ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 637 (s, c, req, done) -> s.listTableNamesByState(c, req, done), 638 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 639 .call(); 640 } 641 642 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 643 return this.<List<TableName>> newMasterCaller() 644 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 645 List<TableName>> call(controller, stub, request, 646 (s, c, req, done) -> s.getTableNames(c, req, done), 647 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 648 .call(); 649 } 650 651 @Override 652 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 653 return this.<List<TableDescriptor>> newMasterCaller() 654 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 655 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 656 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 657 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 658 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 659 .call(); 660 } 661 662 @Override 663 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) { 664 return this.<List<TableDescriptor>> newMasterCaller() 665 .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest, 666 ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub, 667 ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(), 668 (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done), 669 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 670 .call(); 671 } 672 673 @Override 674 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 675 return this.<List<TableName>> newMasterCaller() 676 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 677 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 678 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 679 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 680 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 681 .call(); 682 } 683 684 @Override 685 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 686 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 687 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 688 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 689 List<TableSchema>> call(controller, stub, 690 RequestConverter.buildGetTableDescriptorsRequest(tableName), 691 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 692 (resp) -> resp.getTableSchemaList())) 693 .call(), (tableSchemas, error) -> { 694 if (error != null) { 695 future.completeExceptionally(error); 696 return; 697 } 698 if (!tableSchemas.isEmpty()) { 699 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 700 } else { 701 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 702 } 703 }); 704 return future; 705 } 706 707 @Override 708 public CompletableFuture<Void> createTable(TableDescriptor desc) { 709 return createTable(desc.getTableName(), 710 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 711 } 712 713 @Override 714 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 715 int numRegions) { 716 try { 717 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 718 } catch (IllegalArgumentException e) { 719 return failedFuture(e); 720 } 721 } 722 723 @Override 724 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 725 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 726 + " use createTable(TableDescriptor) instead"); 727 try { 728 verifySplitKeys(splitKeys); 729 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 730 splitKeys, ng.getNonceGroup(), ng.newNonce())); 731 } catch (IllegalArgumentException e) { 732 return failedFuture(e); 733 } 734 } 735 736 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 737 Preconditions.checkNotNull(tableName, "table name is null"); 738 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 739 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 740 new CreateTableProcedureBiConsumer(tableName)); 741 } 742 743 @Override 744 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 745 return modifyTable(desc, true); 746 } 747 748 @Override 749 public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) { 750 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 751 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 752 ng.newNonce(), reopenRegions), 753 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 754 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 755 } 756 757 @Override 758 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 759 return this.<ModifyTableStoreFileTrackerRequest, 760 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 761 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 762 ng.getNonceGroup(), ng.newNonce()), 763 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 764 (resp) -> resp.getProcId(), 765 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 766 } 767 768 @Override 769 public CompletableFuture<Void> deleteTable(TableName tableName) { 770 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 771 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 772 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 773 new DeleteTableProcedureBiConsumer(tableName)); 774 } 775 776 @Override 777 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 778 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 779 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 780 ng.newNonce()), 781 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 782 new TruncateTableProcedureBiConsumer(tableName)); 783 } 784 785 @Override 786 public CompletableFuture<Void> enableTable(TableName tableName) { 787 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 788 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 789 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 790 new EnableTableProcedureBiConsumer(tableName)); 791 } 792 793 @Override 794 public CompletableFuture<Void> disableTable(TableName tableName) { 795 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 796 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 797 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 798 new DisableTableProcedureBiConsumer(tableName)); 799 } 800 801 /** 802 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 803 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 804 * targetState). 805 */ 806 private static CompletableFuture<Boolean> completeCheckTableState( 807 CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error, 808 TableState.State targetState, TableName tableName) { 809 if (error != null) { 810 future.completeExceptionally(error); 811 } else { 812 if (tableState.isPresent()) { 813 future.complete(tableState.get().inStates(targetState)); 814 } else { 815 future.completeExceptionally(new TableNotFoundException(tableName)); 816 } 817 } 818 return future; 819 } 820 821 @Override 822 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 823 if (TableName.isMetaTableName(tableName)) { 824 return CompletableFuture.completedFuture(true); 825 } 826 CompletableFuture<Boolean> future = new CompletableFuture<>(); 827 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 828 (tableState, error) -> { 829 completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName); 830 }); 831 return future; 832 } 833 834 @Override 835 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 836 if (TableName.isMetaTableName(tableName)) { 837 return CompletableFuture.completedFuture(false); 838 } 839 CompletableFuture<Boolean> future = new CompletableFuture<>(); 840 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName), 841 (tableState, error) -> { 842 completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName); 843 }); 844 return future; 845 } 846 847 @Override 848 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 849 if (TableName.isMetaTableName(tableName)) { 850 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 851 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 852 } 853 CompletableFuture<Boolean> future = new CompletableFuture<>(); 854 addListener(isTableEnabled(tableName), (enabled, error) -> { 855 if (error != null) { 856 if (error instanceof TableNotFoundException) { 857 future.complete(false); 858 } else { 859 future.completeExceptionally(error); 860 } 861 return; 862 } 863 if (!enabled) { 864 future.complete(false); 865 } else { 866 addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 867 (locations, error1) -> { 868 if (error1 != null) { 869 future.completeExceptionally(error1); 870 return; 871 } 872 List<HRegionLocation> notDeployedRegions = locations.stream() 873 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 874 if (notDeployedRegions.size() > 0) { 875 if (LOG.isDebugEnabled()) { 876 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 877 } 878 future.complete(false); 879 return; 880 } 881 future.complete(true); 882 }); 883 } 884 }); 885 return future; 886 } 887 888 @Override 889 public CompletableFuture<Void> addColumnFamily(TableName tableName, 890 ColumnFamilyDescriptor columnFamily) { 891 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 892 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 893 ng.newNonce()), 894 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 895 new AddColumnFamilyProcedureBiConsumer(tableName)); 896 } 897 898 @Override 899 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 900 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 901 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 902 ng.newNonce()), 903 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 904 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 905 } 906 907 @Override 908 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 909 ColumnFamilyDescriptor columnFamily) { 910 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 911 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 912 ng.newNonce()), 913 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 914 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 915 } 916 917 @Override 918 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 919 byte[] family, String dstSFT) { 920 return this.<ModifyColumnStoreFileTrackerRequest, 921 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 922 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 923 ng.getNonceGroup(), ng.newNonce()), 924 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 925 (resp) -> resp.getProcId(), 926 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 927 } 928 929 @Override 930 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 931 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 932 RequestConverter.buildCreateNamespaceRequest(descriptor), 933 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 934 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 935 } 936 937 @Override 938 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 939 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 940 RequestConverter.buildModifyNamespaceRequest(descriptor), 941 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 942 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 943 } 944 945 @Override 946 public CompletableFuture<Void> deleteNamespace(String name) { 947 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 948 RequestConverter.buildDeleteNamespaceRequest(name), 949 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 950 new DeleteNamespaceProcedureBiConsumer(name)); 951 } 952 953 @Override 954 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 955 return this.<NamespaceDescriptor> newMasterCaller() 956 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 957 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 958 RequestConverter.buildGetNamespaceDescriptorRequest(name), 959 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 960 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 961 .call(); 962 } 963 964 @Override 965 public CompletableFuture<List<String>> listNamespaces() { 966 return this.<List<String>> newMasterCaller() 967 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 968 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 969 (s, c, req, done) -> s.listNamespaces(c, req, done), 970 (resp) -> resp.getNamespaceNameList())) 971 .call(); 972 } 973 974 @Override 975 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 976 return this.<List<NamespaceDescriptor>> newMasterCaller() 977 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 978 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 979 ListNamespaceDescriptorsRequest.newBuilder().build(), 980 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 981 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 982 .call(); 983 } 984 985 @Override 986 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 987 return this.<List<RegionInfo>> newAdminCaller() 988 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 989 List<RegionInfo>> adminCall(controller, stub, 990 RequestConverter.buildGetOnlineRegionRequest(), 991 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 992 resp -> ProtobufUtil.getRegionInfos(resp))) 993 .serverName(serverName).call(); 994 } 995 996 @Override 997 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 998 if (tableName.equals(META_TABLE_NAME)) { 999 return connection.registry.getMetaRegionLocations() 1000 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 1001 .collect(Collectors.toList())); 1002 } else { 1003 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 1004 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 1005 } 1006 } 1007 1008 @Override 1009 public CompletableFuture<Void> flush(TableName tableName) { 1010 return flush(tableName, Collections.emptyList()); 1011 } 1012 1013 @Override 1014 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 1015 Preconditions.checkNotNull(columnFamily, 1016 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 1017 return flush(tableName, Collections.singletonList(columnFamily)); 1018 } 1019 1020 @Override 1021 public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { 1022 // This is for keeping compatibility with old implementation. 1023 // If the server version is lower than the client version, it's possible that the 1024 // flushTable method is not present in the server side, if so, we need to fall back 1025 // to the old implementation. 1026 Preconditions.checkNotNull(columnFamilyList, 1027 "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); 1028 List<byte[]> columnFamilies = columnFamilyList.stream() 1029 .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); 1030 FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, 1031 ng.getNonceGroup(), ng.newNonce()); 1032 CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( 1033 tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), 1034 (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); 1035 CompletableFuture<Void> future = new CompletableFuture<>(); 1036 addListener(procFuture, (ret, error) -> { 1037 if (error != null) { 1038 if ( 1039 error instanceof TableNotFoundException || error instanceof TableNotEnabledException 1040 || error instanceof NoSuchColumnFamilyException 1041 ) { 1042 future.completeExceptionally(error); 1043 } else if (error instanceof DoNotRetryIOException) { 1044 // usually this is caused by the method is not present on the server or 1045 // the hbase hadoop version does not match the running hadoop version. 1046 // if that happens, we need fall back to the old flush implementation. 1047 LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); 1048 legacyFlush(future, tableName, columnFamilies); 1049 } else { 1050 future.completeExceptionally(error); 1051 } 1052 } else { 1053 future.complete(ret); 1054 } 1055 }); 1056 return future; 1057 } 1058 1059 private void legacyFlush(CompletableFuture<Void> future, TableName tableName, 1060 List<byte[]> columnFamilies) { 1061 addListener(tableExists(tableName), (exists, err) -> { 1062 if (err != null) { 1063 future.completeExceptionally(err); 1064 } else if (!exists) { 1065 future.completeExceptionally(new TableNotFoundException(tableName)); 1066 } else { 1067 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 1068 if (err2 != null) { 1069 future.completeExceptionally(err2); 1070 } else if (!tableEnabled) { 1071 future.completeExceptionally(new TableNotEnabledException(tableName)); 1072 } else { 1073 Map<String, String> props = new HashMap<>(); 1074 if (columnFamilies != null && !columnFamilies.isEmpty()) { 1075 props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER 1076 .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); 1077 } 1078 addListener( 1079 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 1080 (ret, err3) -> { 1081 if (err3 != null) { 1082 future.completeExceptionally(err3); 1083 } else { 1084 future.complete(ret); 1085 } 1086 }); 1087 } 1088 }); 1089 } 1090 }); 1091 } 1092 1093 @Override 1094 public CompletableFuture<Void> flushRegion(byte[] regionName) { 1095 return flushRegionInternal(regionName, null, false).thenAccept(r -> { 1096 }); 1097 } 1098 1099 @Override 1100 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 1101 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1102 + "If you don't specify a columnFamily, use flushRegion(regionName) instead"); 1103 return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> { 1104 }); 1105 } 1106 1107 /** 1108 * This method is for internal use only, where we need the response of the flush. 1109 * <p/> 1110 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public 1111 * API. 1112 */ 1113 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily, 1114 boolean writeFlushWALMarker) { 1115 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>(); 1116 addListener(getRegionLocation(regionName), (location, err) -> { 1117 if (err != null) { 1118 future.completeExceptionally(err); 1119 return; 1120 } 1121 ServerName serverName = location.getServerName(); 1122 if (serverName == null) { 1123 future 1124 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1125 return; 1126 } 1127 addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker), 1128 (ret, err2) -> { 1129 if (err2 != null) { 1130 future.completeExceptionally(err2); 1131 } else { 1132 future.complete(ret); 1133 } 1134 }); 1135 }); 1136 return future; 1137 } 1138 1139 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo, 1140 byte[] columnFamily, boolean writeFlushWALMarker) { 1141 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName) 1142 .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 1143 FlushRegionResponse> adminCall(controller, stub, 1144 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, 1145 writeFlushWALMarker), 1146 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) 1147 .call(); 1148 } 1149 1150 @Override 1151 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 1152 CompletableFuture<Void> future = new CompletableFuture<>(); 1153 addListener(getRegions(sn), (hRegionInfos, err) -> { 1154 if (err != null) { 1155 future.completeExceptionally(err); 1156 return; 1157 } 1158 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1159 if (hRegionInfos != null) { 1160 hRegionInfos 1161 .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> { 1162 }))); 1163 } 1164 addListener(CompletableFuture.allOf( 1165 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1166 if (err2 != null) { 1167 future.completeExceptionally(err2); 1168 } else { 1169 future.complete(ret); 1170 } 1171 }); 1172 }); 1173 return future; 1174 } 1175 1176 @Override 1177 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1178 return compact(tableName, null, false, compactType); 1179 } 1180 1181 @Override 1182 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1183 CompactType compactType) { 1184 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1185 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1186 return compact(tableName, columnFamily, false, compactType); 1187 } 1188 1189 @Override 1190 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1191 return compactRegion(regionName, null, false); 1192 } 1193 1194 @Override 1195 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1196 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1197 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1198 return compactRegion(regionName, columnFamily, false); 1199 } 1200 1201 @Override 1202 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1203 return compact(tableName, null, true, compactType); 1204 } 1205 1206 @Override 1207 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1208 CompactType compactType) { 1209 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1210 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1211 return compact(tableName, columnFamily, true, compactType); 1212 } 1213 1214 @Override 1215 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1216 return compactRegion(regionName, null, true); 1217 } 1218 1219 @Override 1220 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1221 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1222 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1223 return compactRegion(regionName, columnFamily, true); 1224 } 1225 1226 @Override 1227 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1228 return compactRegionServer(sn, false); 1229 } 1230 1231 @Override 1232 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1233 return compactRegionServer(sn, true); 1234 } 1235 1236 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1237 CompletableFuture<Void> future = new CompletableFuture<>(); 1238 addListener(getRegions(sn), (hRegionInfos, err) -> { 1239 if (err != null) { 1240 future.completeExceptionally(err); 1241 return; 1242 } 1243 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1244 if (hRegionInfos != null) { 1245 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1246 } 1247 addListener(CompletableFuture.allOf( 1248 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1249 if (err2 != null) { 1250 future.completeExceptionally(err2); 1251 } else { 1252 future.complete(ret); 1253 } 1254 }); 1255 }); 1256 return future; 1257 } 1258 1259 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1260 boolean major) { 1261 CompletableFuture<Void> future = new CompletableFuture<>(); 1262 addListener(getRegionLocation(regionName), (location, err) -> { 1263 if (err != null) { 1264 future.completeExceptionally(err); 1265 return; 1266 } 1267 ServerName serverName = location.getServerName(); 1268 if (serverName == null) { 1269 future 1270 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1271 return; 1272 } 1273 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1274 (ret, err2) -> { 1275 if (err2 != null) { 1276 future.completeExceptionally(err2); 1277 } else { 1278 future.complete(ret); 1279 } 1280 }); 1281 }); 1282 return future; 1283 } 1284 1285 /** 1286 * List all region locations for the specific table. 1287 */ 1288 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1289 if (TableName.META_TABLE_NAME.equals(tableName)) { 1290 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1291 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1292 if (err != null) { 1293 future.completeExceptionally(err); 1294 } else if ( 1295 metaRegions == null || metaRegions.isEmpty() 1296 || metaRegions.getDefaultRegionLocation() == null 1297 ) { 1298 future.completeExceptionally(new IOException("meta region does not found")); 1299 } else { 1300 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1301 } 1302 }); 1303 return future; 1304 } else { 1305 // For non-meta table, we fetch all locations by scanning hbase:meta table 1306 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1307 } 1308 } 1309 1310 /** 1311 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1312 */ 1313 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1314 CompactType compactType) { 1315 CompletableFuture<Void> future = new CompletableFuture<>(); 1316 1317 switch (compactType) { 1318 case MOB: 1319 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1320 if (err != null) { 1321 future.completeExceptionally(err); 1322 return; 1323 } 1324 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1325 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1326 if (err2 != null) { 1327 future.completeExceptionally(err2); 1328 } else { 1329 future.complete(ret); 1330 } 1331 }); 1332 }); 1333 break; 1334 case NORMAL: 1335 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1336 if (err != null) { 1337 future.completeExceptionally(err); 1338 return; 1339 } 1340 if (locations == null || locations.isEmpty()) { 1341 future.completeExceptionally(new TableNotFoundException(tableName)); 1342 return; 1343 } 1344 CompletableFuture<?>[] compactFutures = 1345 locations.stream().filter(l -> l.getRegion() != null) 1346 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1347 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1348 .toArray(CompletableFuture<?>[]::new); 1349 // future complete unless all of the compact futures are completed. 1350 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1351 if (err2 != null) { 1352 future.completeExceptionally(err2); 1353 } else { 1354 future.complete(ret); 1355 } 1356 }); 1357 }); 1358 break; 1359 default: 1360 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1361 } 1362 return future; 1363 } 1364 1365 /** 1366 * Compact the region at specific region server. 1367 */ 1368 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1369 final boolean major, byte[] columnFamily) { 1370 return this.<Void> newAdminCaller().serverName(sn) 1371 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1372 Void> adminCall(controller, stub, 1373 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1374 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1375 .call(); 1376 } 1377 1378 private byte[] toEncodeRegionName(byte[] regionName) { 1379 return RegionInfo.isEncodedRegionName(regionName) 1380 ? regionName 1381 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1382 } 1383 1384 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1385 CompletableFuture<TableName> result) { 1386 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1387 if (err != null) { 1388 result.completeExceptionally(err); 1389 return; 1390 } 1391 RegionInfo regionInfo = location.getRegion(); 1392 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1393 result.completeExceptionally( 1394 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1395 return; 1396 } 1397 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1398 if (!tableName.get().equals(regionInfo.getTable())) { 1399 // tables of this two region should be same. 1400 result.completeExceptionally( 1401 new IllegalArgumentException("Cannot merge regions from two different tables " 1402 + tableName.get() + " and " + regionInfo.getTable())); 1403 } else { 1404 result.complete(tableName.get()); 1405 } 1406 } 1407 }); 1408 } 1409 1410 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1411 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1412 CompletableFuture<TableName> future = new CompletableFuture<>(); 1413 for (byte[] encodedRegionName : encodedRegionNames) { 1414 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1415 } 1416 return future; 1417 } 1418 1419 @Override 1420 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1421 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1422 } 1423 1424 @Override 1425 public CompletableFuture<Boolean> isMergeEnabled() { 1426 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1427 } 1428 1429 @Override 1430 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1431 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1432 } 1433 1434 @Override 1435 public CompletableFuture<Boolean> isSplitEnabled() { 1436 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1437 } 1438 1439 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1440 MasterSwitchType switchType) { 1441 SetSplitOrMergeEnabledRequest request = 1442 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1443 return this.<Boolean> newMasterCaller() 1444 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1445 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1446 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1447 (resp) -> resp.getPrevValueList().get(0))) 1448 .call(); 1449 } 1450 1451 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1452 IsSplitOrMergeEnabledRequest request = 1453 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1454 return this.<Boolean> newMasterCaller() 1455 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1456 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1457 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1458 .call(); 1459 } 1460 1461 @Override 1462 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1463 if (nameOfRegionsToMerge.size() < 2) { 1464 return failedFuture(new IllegalArgumentException( 1465 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1466 } 1467 CompletableFuture<Void> future = new CompletableFuture<>(); 1468 byte[][] encodedNameOfRegionsToMerge = 1469 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1470 1471 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1472 if (err != null) { 1473 future.completeExceptionally(err); 1474 return; 1475 } 1476 1477 final MergeTableRegionsRequest request; 1478 try { 1479 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1480 forcible, ng.getNonceGroup(), ng.newNonce()); 1481 } catch (DeserializationException e) { 1482 future.completeExceptionally(e); 1483 return; 1484 } 1485 1486 addListener( 1487 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1488 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1489 (ret, err2) -> { 1490 if (err2 != null) { 1491 future.completeExceptionally(err2); 1492 } else { 1493 future.complete(ret); 1494 } 1495 }); 1496 }); 1497 return future; 1498 } 1499 1500 @Override 1501 public CompletableFuture<Void> split(TableName tableName) { 1502 CompletableFuture<Void> future = new CompletableFuture<>(); 1503 addListener(tableExists(tableName), (exist, error) -> { 1504 if (error != null) { 1505 future.completeExceptionally(error); 1506 return; 1507 } 1508 if (!exist) { 1509 future.completeExceptionally(new TableNotFoundException(tableName)); 1510 return; 1511 } 1512 addListener(metaTable 1513 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1514 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName, 1515 ClientMetaTableAccessor.QueryType.REGION)) 1516 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName, 1517 ClientMetaTableAccessor.QueryType.REGION))), 1518 (results, err2) -> { 1519 if (err2 != null) { 1520 future.completeExceptionally(err2); 1521 return; 1522 } 1523 if (results != null && !results.isEmpty()) { 1524 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1525 for (Result r : results) { 1526 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) { 1527 continue; 1528 } 1529 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r); 1530 if (rl != null) { 1531 for (HRegionLocation h : rl.getRegionLocations()) { 1532 if (h != null && h.getServerName() != null) { 1533 RegionInfo hri = h.getRegion(); 1534 if ( 1535 hri == null || hri.isSplitParent() 1536 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1537 ) { 1538 continue; 1539 } 1540 splitFutures.add(split(hri, null)); 1541 } 1542 } 1543 } 1544 } 1545 addListener( 1546 CompletableFuture 1547 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1548 (ret, exception) -> { 1549 if (exception != null) { 1550 future.completeExceptionally(exception); 1551 return; 1552 } 1553 future.complete(ret); 1554 }); 1555 } else { 1556 future.complete(null); 1557 } 1558 }); 1559 }); 1560 return future; 1561 } 1562 1563 @Override 1564 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1565 CompletableFuture<Void> result = new CompletableFuture<>(); 1566 if (splitPoint == null) { 1567 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1568 } 1569 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1570 (loc, err) -> { 1571 if (err != null) { 1572 result.completeExceptionally(err); 1573 } else if (loc == null || loc.getRegion() == null) { 1574 result.completeExceptionally(new IllegalArgumentException( 1575 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1576 } else { 1577 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1578 if (err2 != null) { 1579 result.completeExceptionally(err2); 1580 } else { 1581 result.complete(ret); 1582 } 1583 1584 }); 1585 } 1586 }); 1587 return result; 1588 } 1589 1590 @Override 1591 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1592 CompletableFuture<Void> future = new CompletableFuture<>(); 1593 addListener(getRegionLocation(regionName), (location, err) -> { 1594 if (err != null) { 1595 future.completeExceptionally(err); 1596 return; 1597 } 1598 RegionInfo regionInfo = location.getRegion(); 1599 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1600 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1601 + "Replicas are auto-split when their primary is split.")); 1602 return; 1603 } 1604 ServerName serverName = location.getServerName(); 1605 if (serverName == null) { 1606 future 1607 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1608 return; 1609 } 1610 addListener(split(regionInfo, null), (ret, err2) -> { 1611 if (err2 != null) { 1612 future.completeExceptionally(err2); 1613 } else { 1614 future.complete(ret); 1615 } 1616 }); 1617 }); 1618 return future; 1619 } 1620 1621 @Override 1622 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1623 Preconditions.checkNotNull(splitPoint, 1624 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1625 CompletableFuture<Void> future = new CompletableFuture<>(); 1626 addListener(getRegionLocation(regionName), (location, err) -> { 1627 if (err != null) { 1628 future.completeExceptionally(err); 1629 return; 1630 } 1631 RegionInfo regionInfo = location.getRegion(); 1632 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1633 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1634 + "Replicas are auto-split when their primary is split.")); 1635 return; 1636 } 1637 ServerName serverName = location.getServerName(); 1638 if (serverName == null) { 1639 future 1640 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1641 return; 1642 } 1643 if ( 1644 regionInfo.getStartKey() != null 1645 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1646 ) { 1647 future.completeExceptionally( 1648 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1649 return; 1650 } 1651 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1652 if (err2 != null) { 1653 future.completeExceptionally(err2); 1654 } else { 1655 future.complete(ret); 1656 } 1657 }); 1658 }); 1659 return future; 1660 } 1661 1662 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1663 CompletableFuture<Void> future = new CompletableFuture<>(); 1664 TableName tableName = hri.getTable(); 1665 final SplitTableRegionRequest request; 1666 try { 1667 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1668 ng.newNonce()); 1669 } catch (DeserializationException e) { 1670 future.completeExceptionally(e); 1671 return future; 1672 } 1673 1674 addListener( 1675 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1676 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1677 (ret, err2) -> { 1678 if (err2 != null) { 1679 future.completeExceptionally(err2); 1680 } else { 1681 future.complete(ret); 1682 } 1683 }); 1684 return future; 1685 } 1686 1687 @Override 1688 public CompletableFuture<Void> truncateRegion(byte[] regionName) { 1689 CompletableFuture<Void> future = new CompletableFuture<>(); 1690 addListener(getRegionLocation(regionName), (location, err) -> { 1691 if (err != null) { 1692 future.completeExceptionally(err); 1693 return; 1694 } 1695 RegionInfo regionInfo = location.getRegion(); 1696 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1697 future.completeExceptionally(new IllegalArgumentException( 1698 "Can't truncate replicas directly.Replicas are auto-truncated " 1699 + "when their primary is truncated.")); 1700 return; 1701 } 1702 ServerName serverName = location.getServerName(); 1703 if (serverName == null) { 1704 future 1705 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1706 return; 1707 } 1708 addListener(truncateRegion(regionInfo), (ret, err2) -> { 1709 if (err2 != null) { 1710 future.completeExceptionally(err2); 1711 } else { 1712 future.complete(ret); 1713 } 1714 }); 1715 }); 1716 return future; 1717 } 1718 1719 private CompletableFuture<Void> truncateRegion(final RegionInfo hri) { 1720 CompletableFuture<Void> future = new CompletableFuture<>(); 1721 TableName tableName = hri.getTable(); 1722 final MasterProtos.TruncateRegionRequest request; 1723 try { 1724 request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce()); 1725 } catch (DeserializationException e) { 1726 future.completeExceptionally(e); 1727 return future; 1728 } 1729 addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion, 1730 MasterProtos.TruncateRegionResponse::getProcId, 1731 new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1732 if (err2 != null) { 1733 future.completeExceptionally(err2); 1734 } else { 1735 future.complete(ret); 1736 } 1737 }); 1738 return future; 1739 } 1740 1741 @Override 1742 public CompletableFuture<Void> assign(byte[] regionName) { 1743 CompletableFuture<Void> future = new CompletableFuture<>(); 1744 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1745 if (err != null) { 1746 future.completeExceptionally(err); 1747 return; 1748 } 1749 addListener( 1750 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1751 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1752 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1753 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1754 .call(), 1755 (ret, err2) -> { 1756 if (err2 != null) { 1757 future.completeExceptionally(err2); 1758 } else { 1759 future.complete(ret); 1760 } 1761 }); 1762 }); 1763 return future; 1764 } 1765 1766 @Override 1767 public CompletableFuture<Void> unassign(byte[] regionName) { 1768 CompletableFuture<Void> future = new CompletableFuture<>(); 1769 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1770 if (err != null) { 1771 future.completeExceptionally(err); 1772 return; 1773 } 1774 addListener( 1775 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1776 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1777 Void> call(controller, stub, 1778 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1779 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1780 .call(), 1781 (ret, err2) -> { 1782 if (err2 != null) { 1783 future.completeExceptionally(err2); 1784 } else { 1785 future.complete(ret); 1786 } 1787 }); 1788 }); 1789 return future; 1790 } 1791 1792 @Override 1793 public CompletableFuture<Void> offline(byte[] regionName) { 1794 CompletableFuture<Void> future = new CompletableFuture<>(); 1795 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1796 if (err != null) { 1797 future.completeExceptionally(err); 1798 return; 1799 } 1800 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1801 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1802 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1803 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1804 .call(), (ret, err2) -> { 1805 if (err2 != null) { 1806 future.completeExceptionally(err2); 1807 } else { 1808 future.complete(ret); 1809 } 1810 }); 1811 }); 1812 return future; 1813 } 1814 1815 @Override 1816 public CompletableFuture<Void> move(byte[] regionName) { 1817 CompletableFuture<Void> future = new CompletableFuture<>(); 1818 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1819 if (err != null) { 1820 future.completeExceptionally(err); 1821 return; 1822 } 1823 addListener( 1824 moveRegion(regionInfo, 1825 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1826 (ret, err2) -> { 1827 if (err2 != null) { 1828 future.completeExceptionally(err2); 1829 } else { 1830 future.complete(ret); 1831 } 1832 }); 1833 }); 1834 return future; 1835 } 1836 1837 @Override 1838 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1839 Preconditions.checkNotNull(destServerName, 1840 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1841 CompletableFuture<Void> future = new CompletableFuture<>(); 1842 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1843 if (err != null) { 1844 future.completeExceptionally(err); 1845 return; 1846 } 1847 addListener( 1848 moveRegion(regionInfo, RequestConverter 1849 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1850 (ret, err2) -> { 1851 if (err2 != null) { 1852 future.completeExceptionally(err2); 1853 } else { 1854 future.complete(ret); 1855 } 1856 }); 1857 }); 1858 return future; 1859 } 1860 1861 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1862 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1863 .action( 1864 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1865 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1866 .call(); 1867 } 1868 1869 @Override 1870 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1871 return this.<Void> newMasterCaller() 1872 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1873 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1874 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1875 .call(); 1876 } 1877 1878 @Override 1879 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1880 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1881 Scan scan = QuotaTableUtil.makeScan(filter); 1882 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1883 new AdvancedScanResultConsumer() { 1884 List<QuotaSettings> settings = new ArrayList<>(); 1885 1886 @Override 1887 public void onNext(Result[] results, ScanController controller) { 1888 for (Result result : results) { 1889 try { 1890 QuotaTableUtil.parseResultToCollection(result, settings); 1891 } catch (IOException e) { 1892 controller.terminate(); 1893 future.completeExceptionally(e); 1894 } 1895 } 1896 } 1897 1898 @Override 1899 public void onError(Throwable error) { 1900 future.completeExceptionally(error); 1901 } 1902 1903 @Override 1904 public void onComplete() { 1905 future.complete(settings); 1906 } 1907 }); 1908 return future; 1909 } 1910 1911 @Override 1912 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1913 boolean enabled) { 1914 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1915 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1916 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1917 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1918 } 1919 1920 @Override 1921 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1922 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1923 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1924 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1925 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1926 } 1927 1928 @Override 1929 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1930 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1931 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1932 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1933 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1934 } 1935 1936 @Override 1937 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1938 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1939 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1940 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1941 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1942 } 1943 1944 @Override 1945 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1946 return this.<ReplicationPeerConfig> newMasterCaller() 1947 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1948 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1949 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1950 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1951 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1952 .call(); 1953 } 1954 1955 @Override 1956 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1957 ReplicationPeerConfig peerConfig) { 1958 return this.<UpdateReplicationPeerConfigRequest, 1959 UpdateReplicationPeerConfigResponse> procedureCall( 1960 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1961 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1962 (resp) -> resp.getProcId(), 1963 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1964 } 1965 1966 @Override 1967 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId, 1968 SyncReplicationState clusterState) { 1969 return this.<TransitReplicationPeerSyncReplicationStateRequest, 1970 TransitReplicationPeerSyncReplicationStateResponse> procedureCall( 1971 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId, 1972 clusterState), 1973 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done), 1974 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId, 1975 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE")); 1976 } 1977 1978 @Override 1979 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1980 Map<TableName, List<String>> tableCfs) { 1981 if (tableCfs == null) { 1982 return failedFuture(new ReplicationException("tableCfs is null")); 1983 } 1984 1985 CompletableFuture<Void> future = new CompletableFuture<>(); 1986 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1987 if (!completeExceptionally(future, error)) { 1988 ReplicationPeerConfig newPeerConfig = 1989 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1990 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1991 if (!completeExceptionally(future, error)) { 1992 future.complete(result); 1993 } 1994 }); 1995 } 1996 }); 1997 return future; 1998 } 1999 2000 @Override 2001 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 2002 Map<TableName, List<String>> tableCfs) { 2003 if (tableCfs == null) { 2004 return failedFuture(new ReplicationException("tableCfs is null")); 2005 } 2006 2007 CompletableFuture<Void> future = new CompletableFuture<>(); 2008 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 2009 if (!completeExceptionally(future, error)) { 2010 ReplicationPeerConfig newPeerConfig = null; 2011 try { 2012 newPeerConfig = ReplicationPeerConfigUtil 2013 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 2014 } catch (ReplicationException e) { 2015 future.completeExceptionally(e); 2016 return; 2017 } 2018 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 2019 if (!completeExceptionally(future, error)) { 2020 future.complete(result); 2021 } 2022 }); 2023 } 2024 }); 2025 return future; 2026 } 2027 2028 @Override 2029 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 2030 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 2031 } 2032 2033 @Override 2034 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 2035 Preconditions.checkNotNull(pattern, 2036 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 2037 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 2038 } 2039 2040 private CompletableFuture<List<ReplicationPeerDescription>> 2041 listReplicationPeers(ListReplicationPeersRequest request) { 2042 return this.<List<ReplicationPeerDescription>> newMasterCaller() 2043 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 2044 List<ReplicationPeerDescription>> call(controller, stub, request, 2045 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 2046 (resp) -> resp.getPeerDescList().stream() 2047 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 2048 .collect(Collectors.toList()))) 2049 .call(); 2050 } 2051 2052 @Override 2053 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 2054 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 2055 addListener(listTableDescriptors(), (tables, error) -> { 2056 if (!completeExceptionally(future, error)) { 2057 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 2058 tables.forEach(table -> { 2059 Map<String, Integer> cfs = new HashMap<>(); 2060 Stream.of(table.getColumnFamilies()) 2061 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 2062 .forEach(column -> { 2063 cfs.put(column.getNameAsString(), column.getScope()); 2064 }); 2065 if (!cfs.isEmpty()) { 2066 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 2067 } 2068 }); 2069 future.complete(replicatedTableCFs); 2070 } 2071 }); 2072 return future; 2073 } 2074 2075 @Override 2076 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 2077 SnapshotProtos.SnapshotDescription snapshot = 2078 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 2079 try { 2080 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2081 } catch (IllegalArgumentException e) { 2082 return failedFuture(e); 2083 } 2084 CompletableFuture<Void> future = new CompletableFuture<>(); 2085 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) 2086 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); 2087 addListener(this.<SnapshotResponse> newMasterCaller() 2088 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call( 2089 controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) 2090 .call(), (resp, err) -> { 2091 if (err != null) { 2092 future.completeExceptionally(err); 2093 return; 2094 } 2095 waitSnapshotFinish(snapshotDesc, future, resp); 2096 }); 2097 return future; 2098 } 2099 2100 // This is for keeping compatibility with old implementation. 2101 // If there is a procId field in the response, then the snapshot will be operated with a 2102 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. 2103 private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, 2104 SnapshotResponse resp) { 2105 if (resp.hasProcId()) { 2106 getProcedureResult(resp.getProcId(), src -> null, future, 0); 2107 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); 2108 } else { 2109 long expectedTimeout = resp.getExpectedTimeout(); 2110 TimerTask pollingTask = new TimerTask() { 2111 int tries = 0; 2112 long startTime = EnvironmentEdgeManager.currentTime(); 2113 long endTime = startTime + expectedTimeout; 2114 long maxPauseTime = expectedTimeout / maxAttempts; 2115 2116 @Override 2117 public void run(Timeout timeout) throws Exception { 2118 if (EnvironmentEdgeManager.currentTime() < endTime) { 2119 addListener(isSnapshotFinished(snapshot), (done, err2) -> { 2120 if (err2 != null) { 2121 future.completeExceptionally(err2); 2122 } else if (done) { 2123 future.complete(null); 2124 } else { 2125 // retry again after pauseTime. 2126 long pauseTime = 2127 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2128 pauseTime = Math.min(pauseTime, maxPauseTime); 2129 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); 2130 } 2131 }); 2132 } else { 2133 future 2134 .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName() 2135 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot)); 2136 } 2137 } 2138 }; 2139 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2140 } 2141 } 2142 2143 @Override 2144 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 2145 return this.<Boolean> newMasterCaller() 2146 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 2147 Boolean> call(controller, stub, 2148 IsSnapshotDoneRequest.newBuilder() 2149 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2150 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 2151 .call(); 2152 } 2153 2154 @Override 2155 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 2156 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 2157 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 2158 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 2159 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 2160 } 2161 2162 @Override 2163 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 2164 boolean restoreAcl) { 2165 CompletableFuture<Void> future = new CompletableFuture<>(); 2166 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 2167 if (err != null) { 2168 future.completeExceptionally(err); 2169 return; 2170 } 2171 TableName tableName = null; 2172 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 2173 for (SnapshotDescription snap : snapshotDescriptions) { 2174 if (snap.getName().equals(snapshotName)) { 2175 tableName = snap.getTableName(); 2176 break; 2177 } 2178 } 2179 } 2180 if (tableName == null) { 2181 future.completeExceptionally( 2182 new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist.")); 2183 return; 2184 } 2185 final TableName finalTableName = tableName; 2186 addListener(tableExists(finalTableName), (exists, err2) -> { 2187 if (err2 != null) { 2188 future.completeExceptionally(err2); 2189 } else if (!exists) { 2190 // if table does not exist, then just clone snapshot into new table. 2191 completeConditionalOnFuture(future, 2192 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 2193 } else { 2194 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 2195 if (err4 != null) { 2196 future.completeExceptionally(err4); 2197 } else if (!disabled) { 2198 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 2199 } else { 2200 completeConditionalOnFuture(future, 2201 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 2202 } 2203 }); 2204 } 2205 }); 2206 }); 2207 return future; 2208 } 2209 2210 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 2211 boolean takeFailSafeSnapshot, boolean restoreAcl) { 2212 if (takeFailSafeSnapshot) { 2213 CompletableFuture<Void> future = new CompletableFuture<>(); 2214 // Step.1 Take a snapshot of the current state 2215 String failSafeSnapshotSnapshotNameFormat = 2216 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 2217 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 2218 final String failSafeSnapshotSnapshotName = 2219 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 2220 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 2221 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 2222 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2223 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 2224 if (err != null) { 2225 future.completeExceptionally(err); 2226 } else { 2227 // Step.2 Restore snapshot 2228 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 2229 (void2, err2) -> { 2230 if (err2 != null) { 2231 // Step.3.a Something went wrong during the restore and try to rollback. 2232 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 2233 restoreAcl, null), (void3, err3) -> { 2234 if (err3 != null) { 2235 future.completeExceptionally(err3); 2236 } else { 2237 // If fail to restore snapshot but rollback successfully, delete the 2238 // restore-failsafe snapshot. 2239 LOG.info( 2240 "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2241 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> { 2242 if (err4 != null) { 2243 LOG.error("Unable to remove the failsafe snapshot: {}", 2244 failSafeSnapshotSnapshotName, err4); 2245 } 2246 String msg = 2247 "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot=" 2248 + failSafeSnapshotSnapshotName + " succeeded."; 2249 LOG.error(msg); 2250 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2251 }); 2252 } 2253 }); 2254 } else { 2255 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2256 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2257 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2258 if (err3 != null) { 2259 LOG.error( 2260 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2261 err3); 2262 } 2263 future.complete(ret3); 2264 }); 2265 } 2266 }); 2267 } 2268 }); 2269 return future; 2270 } else { 2271 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2272 } 2273 } 2274 2275 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2276 CompletableFuture<T> parentFuture) { 2277 addListener(parentFuture, (res, err) -> { 2278 if (err != null) { 2279 dependentFuture.completeExceptionally(err); 2280 } else { 2281 dependentFuture.complete(res); 2282 } 2283 }); 2284 } 2285 2286 @Override 2287 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2288 boolean restoreAcl, String customSFT) { 2289 CompletableFuture<Void> future = new CompletableFuture<>(); 2290 addListener(tableExists(tableName), (exists, err) -> { 2291 if (err != null) { 2292 future.completeExceptionally(err); 2293 } else if (exists) { 2294 future.completeExceptionally(new TableExistsException(tableName)); 2295 } else { 2296 completeConditionalOnFuture(future, 2297 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2298 } 2299 }); 2300 return future; 2301 } 2302 2303 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2304 boolean restoreAcl, String customSFT) { 2305 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2306 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2307 try { 2308 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2309 } catch (IllegalArgumentException e) { 2310 return failedFuture(e); 2311 } 2312 RestoreSnapshotRequest.Builder builder = 2313 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2314 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2315 if (customSFT != null) { 2316 builder.setCustomSFT(customSFT); 2317 } 2318 return waitProcedureResult(this.<Long> newMasterCaller() 2319 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2320 Long> call(controller, stub, builder.build(), 2321 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2322 .call(), result -> null); 2323 } 2324 2325 @Override 2326 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2327 return getCompletedSnapshots(null); 2328 } 2329 2330 @Override 2331 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2332 Preconditions.checkNotNull(pattern, 2333 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2334 return getCompletedSnapshots(pattern); 2335 } 2336 2337 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2338 return this.<List<SnapshotDescription>> newMasterCaller() 2339 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2340 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2341 GetCompletedSnapshotsRequest.newBuilder().build(), 2342 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2343 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2344 .call(); 2345 } 2346 2347 @Override 2348 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2349 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2350 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2351 return getCompletedSnapshots(tableNamePattern, null); 2352 } 2353 2354 @Override 2355 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2356 Pattern snapshotNamePattern) { 2357 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2358 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2359 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2360 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2361 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2362 } 2363 2364 private CompletableFuture<List<SnapshotDescription>> 2365 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2366 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2367 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2368 if (err != null) { 2369 future.completeExceptionally(err); 2370 return; 2371 } 2372 if (tableNames == null || tableNames.size() <= 0) { 2373 future.complete(Collections.emptyList()); 2374 return; 2375 } 2376 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2377 if (err2 != null) { 2378 future.completeExceptionally(err2); 2379 return; 2380 } 2381 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2382 future.complete(Collections.emptyList()); 2383 return; 2384 } 2385 future.complete(snapshotDescList.stream() 2386 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2387 .collect(Collectors.toList())); 2388 }); 2389 }); 2390 return future; 2391 } 2392 2393 @Override 2394 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2395 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2396 } 2397 2398 @Override 2399 public CompletableFuture<Void> deleteSnapshots() { 2400 return internalDeleteSnapshots(null, null); 2401 } 2402 2403 @Override 2404 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2405 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2406 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2407 return internalDeleteSnapshots(null, snapshotNamePattern); 2408 } 2409 2410 @Override 2411 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2412 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2413 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2414 return internalDeleteSnapshots(tableNamePattern, null); 2415 } 2416 2417 @Override 2418 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2419 Pattern snapshotNamePattern) { 2420 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2421 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2422 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2423 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2424 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2425 } 2426 2427 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2428 Pattern snapshotNamePattern) { 2429 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2430 if (tableNamePattern == null) { 2431 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2432 } else { 2433 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2434 } 2435 CompletableFuture<Void> future = new CompletableFuture<>(); 2436 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2437 if (err != null) { 2438 future.completeExceptionally(err); 2439 return; 2440 } 2441 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2442 future.complete(null); 2443 return; 2444 } 2445 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2446 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2447 if (e != null) { 2448 future.completeExceptionally(e); 2449 } else { 2450 future.complete(v); 2451 } 2452 }); 2453 }); 2454 return future; 2455 } 2456 2457 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2458 return this.<Void> newMasterCaller() 2459 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2460 controller, stub, 2461 DeleteSnapshotRequest.newBuilder() 2462 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2463 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2464 .call(); 2465 } 2466 2467 @Override 2468 public CompletableFuture<Void> execProcedure(String signature, String instance, 2469 Map<String, String> props) { 2470 CompletableFuture<Void> future = new CompletableFuture<>(); 2471 ProcedureDescription procDesc = 2472 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2473 addListener(this.<Long> newMasterCaller() 2474 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2475 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2476 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2477 .call(), (expectedTimeout, err) -> { 2478 if (err != null) { 2479 future.completeExceptionally(err); 2480 return; 2481 } 2482 TimerTask pollingTask = new TimerTask() { 2483 int tries = 0; 2484 long startTime = EnvironmentEdgeManager.currentTime(); 2485 long endTime = startTime + expectedTimeout; 2486 long maxPauseTime = expectedTimeout / maxAttempts; 2487 2488 @Override 2489 public void run(Timeout timeout) throws Exception { 2490 if (EnvironmentEdgeManager.currentTime() < endTime) { 2491 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2492 if (err2 != null) { 2493 future.completeExceptionally(err2); 2494 return; 2495 } 2496 if (done) { 2497 future.complete(null); 2498 } else { 2499 // retry again after pauseTime. 2500 long pauseTime = 2501 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2502 pauseTime = Math.min(pauseTime, maxPauseTime); 2503 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2504 TimeUnit.MICROSECONDS); 2505 } 2506 }); 2507 } else { 2508 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2509 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2510 } 2511 } 2512 }; 2513 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2514 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2515 }); 2516 return future; 2517 } 2518 2519 @Override 2520 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2521 Map<String, String> props) { 2522 ProcedureDescription proDesc = 2523 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2524 return this.<byte[]> newMasterCaller() 2525 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2526 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2527 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2528 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2529 .call(); 2530 } 2531 2532 @Override 2533 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2534 Map<String, String> props) { 2535 ProcedureDescription proDesc = 2536 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2537 return this.<Boolean> newMasterCaller() 2538 .action( 2539 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2540 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2541 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2542 .call(); 2543 } 2544 2545 @Override 2546 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2547 return this.<Boolean> newMasterCaller().action( 2548 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2549 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2550 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2551 .call(); 2552 } 2553 2554 @Override 2555 public CompletableFuture<String> getProcedures() { 2556 return this.<String> newMasterCaller() 2557 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2558 controller, stub, GetProceduresRequest.newBuilder().build(), 2559 (s, c, req, done) -> s.getProcedures(c, req, done), 2560 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2561 .call(); 2562 } 2563 2564 @Override 2565 public CompletableFuture<String> getLocks() { 2566 return this.<String> newMasterCaller() 2567 .action( 2568 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2569 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2570 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2571 .call(); 2572 } 2573 2574 @Override 2575 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2576 boolean offload) { 2577 return this.<Void> newMasterCaller() 2578 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2579 DecommissionRegionServersResponse, Void> call(controller, stub, 2580 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2581 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2582 .call(); 2583 } 2584 2585 @Override 2586 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2587 return this.<List<ServerName>> newMasterCaller() 2588 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2589 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2590 ListDecommissionedRegionServersRequest.newBuilder().build(), 2591 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2592 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2593 .collect(Collectors.toList()))) 2594 .call(); 2595 } 2596 2597 @Override 2598 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2599 List<byte[]> encodedRegionNames) { 2600 return this.<Void> newMasterCaller() 2601 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2602 RecommissionRegionServerResponse, Void> call(controller, stub, 2603 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2604 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2605 .call(); 2606 } 2607 2608 /** 2609 * Get the region location for the passed region name. The region name may be a full region name 2610 * or encoded region name. If the region does not found, then it'll throw an 2611 * UnknownRegionException wrapped by a {@link CompletableFuture} 2612 * @param regionNameOrEncodedRegionName region name or encoded region name 2613 * @return region location, wrapped by a {@link CompletableFuture} 2614 */ 2615 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2616 if (regionNameOrEncodedRegionName == null) { 2617 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2618 } 2619 2620 CompletableFuture<Optional<HRegionLocation>> future; 2621 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2622 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2623 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2624 // old format encodedName, should be meta region 2625 future = connection.registry.getMetaRegionLocations() 2626 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2627 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2628 } else { 2629 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2630 regionNameOrEncodedRegionName); 2631 } 2632 } else { 2633 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2634 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2635 RegionInfo regionInfo; 2636 try { 2637 regionInfo = 2638 CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2639 } catch (IOException ioe) { 2640 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2641 } 2642 2643 if (regionInfo.isMetaRegion()) { 2644 future = connection.registry.getMetaRegionLocations() 2645 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2646 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2647 .findFirst()); 2648 } else { 2649 future = 2650 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2651 } 2652 } 2653 2654 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2655 addListener(future, (location, err) -> { 2656 if (err != null) { 2657 returnedFuture.completeExceptionally(err); 2658 return; 2659 } 2660 if (!location.isPresent() || location.get().getRegion() == null) { 2661 returnedFuture.completeExceptionally( 2662 new UnknownRegionException("Invalid region name or encoded region name: " 2663 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2664 } else { 2665 returnedFuture.complete(location.get()); 2666 } 2667 }); 2668 return returnedFuture; 2669 } 2670 2671 /** 2672 * Get the region info for the passed region name. The region name may be a full region name or 2673 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2674 * wrapped by a {@link CompletableFuture} 2675 * @return region info, wrapped by a {@link CompletableFuture} 2676 */ 2677 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2678 if (regionNameOrEncodedRegionName == null) { 2679 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2680 } 2681 2682 if ( 2683 Bytes.equals(regionNameOrEncodedRegionName, 2684 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2685 || Bytes.equals(regionNameOrEncodedRegionName, 2686 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2687 ) { 2688 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2689 } 2690 2691 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2692 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2693 if (err != null) { 2694 future.completeExceptionally(err); 2695 } else { 2696 future.complete(location.getRegion()); 2697 } 2698 }); 2699 return future; 2700 } 2701 2702 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2703 if (numRegions < 3) { 2704 throw new IllegalArgumentException("Must create at least three regions"); 2705 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2706 throw new IllegalArgumentException("Start key must be smaller than end key"); 2707 } 2708 if (numRegions == 3) { 2709 return new byte[][] { startKey, endKey }; 2710 } 2711 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2712 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2713 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2714 } 2715 return splitKeys; 2716 } 2717 2718 private void verifySplitKeys(byte[][] splitKeys) { 2719 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2720 // Verify there are no duplicate split keys 2721 byte[] lastKey = null; 2722 for (byte[] splitKey : splitKeys) { 2723 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2724 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2725 } 2726 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2727 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2728 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2729 } 2730 lastKey = splitKey; 2731 } 2732 } 2733 2734 private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> { 2735 2736 abstract void onFinished(); 2737 2738 abstract void onError(Throwable error); 2739 2740 @Override 2741 public void accept(T value, Throwable error) { 2742 if (error != null) { 2743 onError(error); 2744 return; 2745 } 2746 onFinished(); 2747 } 2748 } 2749 2750 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> { 2751 protected final TableName tableName; 2752 2753 TableProcedureBiConsumer(TableName tableName) { 2754 this.tableName = tableName; 2755 } 2756 2757 abstract String getOperationType(); 2758 2759 String getDescription() { 2760 return "Operation: " + getOperationType() + ", " + "Table Name: " 2761 + tableName.getNameWithNamespaceInclAsString(); 2762 } 2763 2764 @Override 2765 void onFinished() { 2766 LOG.info(getDescription() + " completed"); 2767 } 2768 2769 @Override 2770 void onError(Throwable error) { 2771 LOG.info(getDescription() + " failed with " + error.getMessage()); 2772 } 2773 } 2774 2775 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> { 2776 protected final String namespaceName; 2777 2778 NamespaceProcedureBiConsumer(String namespaceName) { 2779 this.namespaceName = namespaceName; 2780 } 2781 2782 abstract String getOperationType(); 2783 2784 String getDescription() { 2785 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2786 } 2787 2788 @Override 2789 void onFinished() { 2790 LOG.info("{} completed", getDescription()); 2791 } 2792 2793 @Override 2794 void onError(Throwable error) { 2795 LOG.info("{} failed with {}", getDescription(), error.getMessage()); 2796 } 2797 } 2798 2799 private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer { 2800 2801 @Override 2802 void onFinished() { 2803 LOG.info("RestoreBackupSystemTableProcedure completed"); 2804 } 2805 2806 @Override 2807 void onError(Throwable error) { 2808 LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage()); 2809 } 2810 } 2811 2812 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2813 2814 CreateTableProcedureBiConsumer(TableName tableName) { 2815 super(tableName); 2816 } 2817 2818 @Override 2819 String getOperationType() { 2820 return "CREATE"; 2821 } 2822 } 2823 2824 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2825 2826 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2827 super(tableName); 2828 } 2829 2830 @Override 2831 String getOperationType() { 2832 return "MODIFY"; 2833 } 2834 } 2835 2836 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2837 extends TableProcedureBiConsumer { 2838 2839 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2840 super(tableName); 2841 } 2842 2843 @Override 2844 String getOperationType() { 2845 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2846 } 2847 } 2848 2849 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2850 2851 DeleteTableProcedureBiConsumer(TableName tableName) { 2852 super(tableName); 2853 } 2854 2855 @Override 2856 String getOperationType() { 2857 return "DELETE"; 2858 } 2859 2860 @Override 2861 void onFinished() { 2862 connection.getLocator().clearCache(this.tableName); 2863 super.onFinished(); 2864 } 2865 } 2866 2867 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2868 2869 TruncateTableProcedureBiConsumer(TableName tableName) { 2870 super(tableName); 2871 } 2872 2873 @Override 2874 String getOperationType() { 2875 return "TRUNCATE"; 2876 } 2877 } 2878 2879 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2880 2881 EnableTableProcedureBiConsumer(TableName tableName) { 2882 super(tableName); 2883 } 2884 2885 @Override 2886 String getOperationType() { 2887 return "ENABLE"; 2888 } 2889 } 2890 2891 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2892 2893 DisableTableProcedureBiConsumer(TableName tableName) { 2894 super(tableName); 2895 } 2896 2897 @Override 2898 String getOperationType() { 2899 return "DISABLE"; 2900 } 2901 } 2902 2903 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2904 2905 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2906 super(tableName); 2907 } 2908 2909 @Override 2910 String getOperationType() { 2911 return "ADD_COLUMN_FAMILY"; 2912 } 2913 } 2914 2915 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2916 2917 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2918 super(tableName); 2919 } 2920 2921 @Override 2922 String getOperationType() { 2923 return "DELETE_COLUMN_FAMILY"; 2924 } 2925 } 2926 2927 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2928 2929 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2930 super(tableName); 2931 } 2932 2933 @Override 2934 String getOperationType() { 2935 return "MODIFY_COLUMN_FAMILY"; 2936 } 2937 } 2938 2939 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2940 extends TableProcedureBiConsumer { 2941 2942 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2943 super(tableName); 2944 } 2945 2946 @Override 2947 String getOperationType() { 2948 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2949 } 2950 } 2951 2952 private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { 2953 2954 FlushTableProcedureBiConsumer(TableName tableName) { 2955 super(tableName); 2956 } 2957 2958 @Override 2959 String getOperationType() { 2960 return "FLUSH"; 2961 } 2962 } 2963 2964 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2965 2966 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2967 super(namespaceName); 2968 } 2969 2970 @Override 2971 String getOperationType() { 2972 return "CREATE_NAMESPACE"; 2973 } 2974 } 2975 2976 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2977 2978 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2979 super(namespaceName); 2980 } 2981 2982 @Override 2983 String getOperationType() { 2984 return "DELETE_NAMESPACE"; 2985 } 2986 } 2987 2988 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2989 2990 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2991 super(namespaceName); 2992 } 2993 2994 @Override 2995 String getOperationType() { 2996 return "MODIFY_NAMESPACE"; 2997 } 2998 } 2999 3000 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3001 3002 MergeTableRegionProcedureBiConsumer(TableName tableName) { 3003 super(tableName); 3004 } 3005 3006 @Override 3007 String getOperationType() { 3008 return "MERGE_REGIONS"; 3009 } 3010 } 3011 3012 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3013 3014 SplitTableRegionProcedureBiConsumer(TableName tableName) { 3015 super(tableName); 3016 } 3017 3018 @Override 3019 String getOperationType() { 3020 return "SPLIT_REGION"; 3021 } 3022 } 3023 3024 private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer { 3025 3026 TruncateRegionProcedureBiConsumer(TableName tableName) { 3027 super(tableName); 3028 } 3029 3030 @Override 3031 String getOperationType() { 3032 return "TRUNCATE_REGION"; 3033 } 3034 } 3035 3036 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { 3037 SnapshotProcedureBiConsumer(TableName tableName) { 3038 super(tableName); 3039 } 3040 3041 @Override 3042 String getOperationType() { 3043 return "SNAPSHOT"; 3044 } 3045 } 3046 3047 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> { 3048 private final String peerId; 3049 private final Supplier<String> getOperation; 3050 3051 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 3052 this.peerId = peerId; 3053 this.getOperation = getOperation; 3054 } 3055 3056 String getDescription() { 3057 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 3058 } 3059 3060 @Override 3061 void onFinished() { 3062 LOG.info("{} completed", getDescription()); 3063 } 3064 3065 @Override 3066 void onError(Throwable error) { 3067 LOG.info("{} failed with {}", getDescription(), error.getMessage()); 3068 } 3069 } 3070 3071 private static final class RollAllWALWritersBiConsumer 3072 extends ProcedureBiConsumer<Map<ServerName, Long>> { 3073 3074 @Override 3075 void onFinished() { 3076 LOG.info("Rolling all WAL writers completed"); 3077 } 3078 3079 @Override 3080 void onError(Throwable error) { 3081 LOG.warn("Rolling all WAL writers failed with {}", error.getMessage()); 3082 } 3083 } 3084 3085 private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture, 3086 Converter<T, ByteString> converter) { 3087 CompletableFuture<T> future = new CompletableFuture<>(); 3088 addListener(procFuture, (procId, error) -> { 3089 if (error != null) { 3090 future.completeExceptionally(error); 3091 return; 3092 } 3093 getProcedureResult(procId, converter, future, 0); 3094 }); 3095 return future; 3096 } 3097 3098 private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter, 3099 CompletableFuture<T> future, int retries) { 3100 addListener( 3101 this.<GetProcedureResultResponse> newMasterCaller() 3102 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 3103 GetProcedureResultResponse> call(controller, stub, 3104 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 3105 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 3106 .call(), 3107 (response, error) -> { 3108 if (error != null) { 3109 LOG.warn("failed to get the procedure result procId={}", procId, 3110 ConnectionUtils.translateException(error)); 3111 retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), 3112 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3113 return; 3114 } 3115 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 3116 retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), 3117 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 3118 return; 3119 } 3120 if (response.hasException()) { 3121 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 3122 future.completeExceptionally(ioe); 3123 } else { 3124 try { 3125 future.complete(converter.convert(response.getResult())); 3126 } catch (IOException e) { 3127 future.completeExceptionally(e); 3128 } 3129 } 3130 }); 3131 } 3132 3133 private <T> CompletableFuture<T> failedFuture(Throwable error) { 3134 CompletableFuture<T> future = new CompletableFuture<>(); 3135 future.completeExceptionally(error); 3136 return future; 3137 } 3138 3139 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 3140 if (error != null) { 3141 future.completeExceptionally(error); 3142 return true; 3143 } 3144 return false; 3145 } 3146 3147 @Override 3148 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 3149 return getClusterMetrics(EnumSet.allOf(Option.class)); 3150 } 3151 3152 @Override 3153 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 3154 return this.<ClusterMetrics> newMasterCaller() 3155 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 3156 ClusterMetrics> call(controller, stub, 3157 RequestConverter.buildGetClusterStatusRequest(options), 3158 (s, c, req, done) -> s.getClusterStatus(c, req, done), 3159 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 3160 .call(); 3161 } 3162 3163 @Override 3164 public CompletableFuture<Void> shutdown() { 3165 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3166 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 3167 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 3168 resp -> null)) 3169 .call(); 3170 } 3171 3172 @Override 3173 public CompletableFuture<Void> stopMaster() { 3174 return this.<Void> newMasterCaller().priority(HIGH_QOS) 3175 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 3176 controller, stub, StopMasterRequest.newBuilder().build(), 3177 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 3178 .call(); 3179 } 3180 3181 @Override 3182 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 3183 StopServerRequest request = RequestConverter 3184 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 3185 return this.<Void> newAdminCaller().priority(HIGH_QOS) 3186 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 3187 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 3188 resp -> null)) 3189 .serverName(serverName).call(); 3190 } 3191 3192 @Override 3193 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 3194 return this.<Void> newAdminCaller() 3195 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 3196 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 3197 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 3198 .serverName(serverName).call(); 3199 } 3200 3201 @Override 3202 public CompletableFuture<Void> updateConfiguration() { 3203 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3204 addListener( 3205 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 3206 (status, err) -> { 3207 if (err != null) { 3208 future.completeExceptionally(err); 3209 } else { 3210 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3211 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 3212 futures.add(updateConfiguration(status.getMasterName())); 3213 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 3214 addListener( 3215 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3216 (result, err2) -> { 3217 if (err2 != null) { 3218 future.completeExceptionally(err2); 3219 } else { 3220 future.complete(result); 3221 } 3222 }); 3223 } 3224 }); 3225 return future; 3226 } 3227 3228 @Override 3229 public CompletableFuture<Void> updateConfiguration(String groupName) { 3230 CompletableFuture<Void> future = new CompletableFuture<Void>(); 3231 addListener(getRSGroup(groupName), (rsGroupInfo, err) -> { 3232 if (err != null) { 3233 future.completeExceptionally(err); 3234 } else if (rsGroupInfo == null) { 3235 future.completeExceptionally( 3236 new IllegalArgumentException("Group does not exist: " + groupName)); 3237 } else { 3238 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> { 3239 if (err2 != null) { 3240 future.completeExceptionally(err2); 3241 } else { 3242 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3243 List<ServerName> groupServers = status.getServersName().stream() 3244 .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList()); 3245 groupServers.forEach(server -> futures.add(updateConfiguration(server))); 3246 addListener( 3247 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3248 (result, err3) -> { 3249 if (err3 != null) { 3250 future.completeExceptionally(err3); 3251 } else { 3252 future.complete(result); 3253 } 3254 }); 3255 } 3256 }); 3257 } 3258 }); 3259 return future; 3260 } 3261 3262 @Override 3263 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 3264 return this.<Void> newAdminCaller() 3265 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 3266 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 3267 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 3268 .serverName(serverName).call(); 3269 } 3270 3271 @Override 3272 public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() { 3273 return this 3274 .<RollAllWALWritersRequest, RollAllWALWritersResponse, 3275 Map<ServerName, 3276 Long>> procedureCall( 3277 RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()), 3278 (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(), 3279 result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap() 3280 .entrySet().stream().collect(Collectors 3281 .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)), 3282 new RollAllWALWritersBiConsumer()); 3283 } 3284 3285 @Override 3286 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 3287 return this.<Void> newAdminCaller() 3288 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 3289 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 3290 RequestConverter.buildClearCompactionQueuesRequest(queues), 3291 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 3292 .serverName(serverName).call(); 3293 } 3294 3295 @Override 3296 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 3297 return this.<List<SecurityCapability>> newMasterCaller() 3298 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 3299 List<SecurityCapability>> call(controller, stub, 3300 SecurityCapabilitiesRequest.newBuilder().build(), 3301 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 3302 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 3303 .call(); 3304 } 3305 3306 @Override 3307 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 3308 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 3309 } 3310 3311 @Override 3312 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 3313 TableName tableName) { 3314 Preconditions.checkNotNull(tableName, 3315 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 3316 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 3317 } 3318 3319 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 3320 ServerName serverName) { 3321 return this.<List<RegionMetrics>> newAdminCaller() 3322 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 3323 List<RegionMetrics>> adminCall(controller, stub, request, 3324 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 3325 RegionMetricsBuilder::toRegionMetrics)) 3326 .serverName(serverName).call(); 3327 } 3328 3329 @Override 3330 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 3331 return this.<Boolean> newMasterCaller() 3332 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 3333 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 3334 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 3335 resp -> resp.getInMaintenanceMode())) 3336 .call(); 3337 } 3338 3339 @Override 3340 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 3341 CompactType compactType) { 3342 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3343 3344 switch (compactType) { 3345 case MOB: 3346 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 3347 if (err != null) { 3348 future.completeExceptionally(err); 3349 return; 3350 } 3351 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 3352 3353 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 3354 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3355 GetRegionInfoResponse> adminCall(controller, stub, 3356 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 3357 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3358 .call(), (resp2, err2) -> { 3359 if (err2 != null) { 3360 future.completeExceptionally(err2); 3361 } else { 3362 if (resp2.hasCompactionState()) { 3363 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3364 } else { 3365 future.complete(CompactionState.NONE); 3366 } 3367 } 3368 }); 3369 }); 3370 break; 3371 case NORMAL: 3372 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3373 if (err != null) { 3374 future.completeExceptionally(err); 3375 return; 3376 } 3377 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3378 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3379 locations.stream().filter(loc -> loc.getServerName() != null) 3380 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3381 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3382 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3383 // If any region compaction state is MAJOR_AND_MINOR 3384 // the table compaction state is MAJOR_AND_MINOR, too. 3385 if (err2 != null) { 3386 future.completeExceptionally(unwrapCompletionException(err2)); 3387 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3388 future.complete(regionState); 3389 } else { 3390 regionStates.add(regionState); 3391 } 3392 })); 3393 }); 3394 addListener( 3395 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3396 (ret, err3) -> { 3397 // If future not completed, check all regions's compaction state 3398 if (!future.isCompletedExceptionally() && !future.isDone()) { 3399 CompactionState state = CompactionState.NONE; 3400 for (CompactionState regionState : regionStates) { 3401 switch (regionState) { 3402 case MAJOR: 3403 if (state == CompactionState.MINOR) { 3404 future.complete(CompactionState.MAJOR_AND_MINOR); 3405 } else { 3406 state = CompactionState.MAJOR; 3407 } 3408 break; 3409 case MINOR: 3410 if (state == CompactionState.MAJOR) { 3411 future.complete(CompactionState.MAJOR_AND_MINOR); 3412 } else { 3413 state = CompactionState.MINOR; 3414 } 3415 break; 3416 case NONE: 3417 default: 3418 } 3419 } 3420 if (!future.isDone()) { 3421 future.complete(state); 3422 } 3423 } 3424 }); 3425 }); 3426 break; 3427 default: 3428 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3429 } 3430 3431 return future; 3432 } 3433 3434 @Override 3435 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3436 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3437 addListener(getRegionLocation(regionName), (location, err) -> { 3438 if (err != null) { 3439 future.completeExceptionally(err); 3440 return; 3441 } 3442 ServerName serverName = location.getServerName(); 3443 if (serverName == null) { 3444 future 3445 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3446 return; 3447 } 3448 addListener( 3449 this.<GetRegionInfoResponse> newAdminCaller() 3450 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3451 GetRegionInfoResponse> adminCall(controller, stub, 3452 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3453 true), 3454 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3455 .serverName(serverName).call(), 3456 (resp2, err2) -> { 3457 if (err2 != null) { 3458 future.completeExceptionally(err2); 3459 } else { 3460 if (resp2.hasCompactionState()) { 3461 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3462 } else { 3463 future.complete(CompactionState.NONE); 3464 } 3465 } 3466 }); 3467 }); 3468 return future; 3469 } 3470 3471 @Override 3472 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3473 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3474 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3475 return this.<Optional<Long>> newMasterCaller() 3476 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3477 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3478 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3479 ProtobufUtil::toOptionalTimestamp)) 3480 .call(); 3481 } 3482 3483 @Override 3484 public CompletableFuture<Optional<Long>> 3485 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3486 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3487 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3488 addListener(getRegionInfo(regionName), (region, err) -> { 3489 if (err != null) { 3490 future.completeExceptionally(err); 3491 return; 3492 } 3493 MajorCompactionTimestampForRegionRequest.Builder builder = 3494 MajorCompactionTimestampForRegionRequest.newBuilder(); 3495 builder.setRegion( 3496 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3497 addListener(this.<Optional<Long>> newMasterCaller() 3498 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3499 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3500 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3501 ProtobufUtil::toOptionalTimestamp)) 3502 .call(), (timestamp, err2) -> { 3503 if (err2 != null) { 3504 future.completeExceptionally(err2); 3505 } else { 3506 future.complete(timestamp); 3507 } 3508 }); 3509 }); 3510 return future; 3511 } 3512 3513 @Override 3514 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3515 List<String> serverNamesList) { 3516 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3517 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3518 if (err != null) { 3519 future.completeExceptionally(err); 3520 return; 3521 } 3522 // Accessed by multiple threads. 3523 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3524 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3525 serverNames.stream().forEach(serverName -> { 3526 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3527 if (err2 != null) { 3528 future.completeExceptionally(unwrapCompletionException(err2)); 3529 } else { 3530 serverStates.put(serverName, serverState); 3531 } 3532 })); 3533 }); 3534 addListener( 3535 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3536 (ret, err3) -> { 3537 if (!future.isCompletedExceptionally()) { 3538 if (err3 != null) { 3539 future.completeExceptionally(err3); 3540 } else { 3541 future.complete(serverStates); 3542 } 3543 } 3544 }); 3545 }); 3546 return future; 3547 } 3548 3549 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3550 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3551 if (serverNamesList.isEmpty()) { 3552 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3553 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3554 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3555 if (err != null) { 3556 future.completeExceptionally(err); 3557 } else { 3558 future.complete(clusterMetrics.getServersName()); 3559 } 3560 }); 3561 return future; 3562 } else { 3563 List<ServerName> serverList = new ArrayList<>(); 3564 for (String regionServerName : serverNamesList) { 3565 ServerName serverName = null; 3566 try { 3567 serverName = ServerName.valueOf(regionServerName); 3568 } catch (Exception e) { 3569 future.completeExceptionally( 3570 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3571 } 3572 if (serverName == null) { 3573 future.completeExceptionally( 3574 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3575 } else { 3576 serverList.add(serverName); 3577 } 3578 } 3579 future.complete(serverList); 3580 } 3581 return future; 3582 } 3583 3584 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3585 return this.<Boolean> newAdminCaller().serverName(serverName) 3586 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3587 Boolean> adminCall(controller, stub, 3588 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3589 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3590 .call(); 3591 } 3592 3593 @Override 3594 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3595 return this.<Boolean> newMasterCaller() 3596 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3597 Boolean> call(controller, stub, 3598 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3599 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3600 (resp) -> resp.getPrevBalanceValue())) 3601 .call(); 3602 } 3603 3604 @Override 3605 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3606 return this.<BalanceResponse> newMasterCaller() 3607 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3608 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3609 (s, c, req, done) -> s.balance(c, req, done), 3610 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3611 .call(); 3612 } 3613 3614 @Override 3615 public CompletableFuture<Boolean> isBalancerEnabled() { 3616 return this.<Boolean> newMasterCaller() 3617 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3618 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3619 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3620 .call(); 3621 } 3622 3623 @Override 3624 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3625 return this.<Boolean> newMasterCaller() 3626 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3627 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3628 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3629 (resp) -> resp.getPrevNormalizerValue())) 3630 .call(); 3631 } 3632 3633 @Override 3634 public CompletableFuture<Boolean> isNormalizerEnabled() { 3635 return this.<Boolean> newMasterCaller() 3636 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3637 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3638 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3639 .call(); 3640 } 3641 3642 @Override 3643 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3644 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3645 } 3646 3647 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3648 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3649 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3650 } 3651 3652 @Override 3653 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3654 return this.<Boolean> newMasterCaller() 3655 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3656 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3657 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3658 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3659 (resp) -> resp.getPrevValue())) 3660 .call(); 3661 } 3662 3663 @Override 3664 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3665 return this.<Boolean> newMasterCaller() 3666 .action( 3667 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3668 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3669 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3670 .call(); 3671 } 3672 3673 @Override 3674 public CompletableFuture<Boolean> runCleanerChore() { 3675 return this.<Boolean> newMasterCaller() 3676 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3677 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3678 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3679 (resp) -> resp.getCleanerChoreRan())) 3680 .call(); 3681 } 3682 3683 @Override 3684 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3685 return this.<Boolean> newMasterCaller() 3686 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3687 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3688 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3689 .call(); 3690 } 3691 3692 @Override 3693 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3694 return this.<Boolean> newMasterCaller() 3695 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3696 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3697 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3698 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3699 .call(); 3700 } 3701 3702 @Override 3703 public CompletableFuture<Integer> runCatalogJanitor() { 3704 return this.<Integer> newMasterCaller() 3705 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3706 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3707 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3708 .call(); 3709 } 3710 3711 @Override 3712 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3713 ServiceCaller<S, R> callable) { 3714 MasterCoprocessorRpcChannelImpl channel = 3715 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3716 S stub = stubMaker.apply(channel); 3717 CompletableFuture<R> future = new CompletableFuture<>(); 3718 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3719 callable.call(stub, controller, resp -> { 3720 if (controller.failed()) { 3721 future.completeExceptionally(controller.getFailed()); 3722 } else { 3723 future.complete(resp); 3724 } 3725 }); 3726 return future; 3727 } 3728 3729 @Override 3730 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3731 ServiceCaller<S, R> callable, ServerName serverName) { 3732 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3733 this.<Message> newServerCaller().serverName(serverName)); 3734 S stub = stubMaker.apply(channel); 3735 CompletableFuture<R> future = new CompletableFuture<>(); 3736 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3737 callable.call(stub, controller, resp -> { 3738 if (controller.failed()) { 3739 future.completeExceptionally(controller.getFailed()); 3740 } else { 3741 future.complete(resp); 3742 } 3743 }); 3744 return future; 3745 } 3746 3747 @Override 3748 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3749 return this.<List<ServerName>> newMasterCaller() 3750 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3751 List<ServerName>> call(controller, stub, 3752 RequestConverter.buildClearDeadServersRequest(servers), 3753 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3754 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3755 .call(); 3756 } 3757 3758 <T> ServerRequestCallerBuilder<T> newServerCaller() { 3759 return this.connection.callerFactory.<T> serverRequest() 3760 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3761 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3762 .pause(pauseNs, TimeUnit.NANOSECONDS) 3763 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3764 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3765 } 3766 3767 @Override 3768 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3769 if (tableName == null) { 3770 return failedFuture(new IllegalArgumentException("Table name is null")); 3771 } 3772 CompletableFuture<Void> future = new CompletableFuture<>(); 3773 addListener(tableExists(tableName), (exist, err) -> { 3774 if (err != null) { 3775 future.completeExceptionally(err); 3776 return; 3777 } 3778 if (!exist) { 3779 future.completeExceptionally(new TableNotFoundException( 3780 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3781 return; 3782 } 3783 addListener(getTableSplits(tableName), (splits, err1) -> { 3784 if (err1 != null) { 3785 future.completeExceptionally(err1); 3786 } else { 3787 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3788 if (err2 != null) { 3789 future.completeExceptionally(err2); 3790 } else { 3791 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3792 if (err3 != null) { 3793 future.completeExceptionally(err3); 3794 } else { 3795 future.complete(result3); 3796 } 3797 }); 3798 } 3799 }); 3800 } 3801 }); 3802 }); 3803 return future; 3804 } 3805 3806 @Override 3807 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3808 if (tableName == null) { 3809 return failedFuture(new IllegalArgumentException("Table name is null")); 3810 } 3811 CompletableFuture<Void> future = new CompletableFuture<>(); 3812 addListener(tableExists(tableName), (exist, err) -> { 3813 if (err != null) { 3814 future.completeExceptionally(err); 3815 return; 3816 } 3817 if (!exist) { 3818 future.completeExceptionally(new TableNotFoundException( 3819 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3820 return; 3821 } 3822 addListener(setTableReplication(tableName, false), (result, err2) -> { 3823 if (err2 != null) { 3824 future.completeExceptionally(err2); 3825 } else { 3826 future.complete(result); 3827 } 3828 }); 3829 }); 3830 return future; 3831 } 3832 3833 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3834 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3835 addListener( 3836 getRegions(tableName).thenApply(regions -> regions.stream() 3837 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3838 (regions, err2) -> { 3839 if (err2 != null) { 3840 future.completeExceptionally(err2); 3841 return; 3842 } 3843 if (regions.size() == 1) { 3844 future.complete(null); 3845 } else { 3846 byte[][] splits = new byte[regions.size() - 1][]; 3847 for (int i = 1; i < regions.size(); i++) { 3848 splits[i - 1] = regions.get(i).getStartKey(); 3849 } 3850 future.complete(splits); 3851 } 3852 }); 3853 return future; 3854 } 3855 3856 /** 3857 * Connect to peer and check the table descriptor on peer: 3858 * <ol> 3859 * <li>Create the same table on peer when not exist.</li> 3860 * <li>Throw an exception if the table already has replication enabled on any of the column 3861 * families.</li> 3862 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3863 * </ol> 3864 * @param tableName name of the table to sync to the peer 3865 * @param splits table split keys 3866 */ 3867 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3868 byte[][] splits) { 3869 CompletableFuture<Void> future = new CompletableFuture<>(); 3870 addListener(listReplicationPeers(), (peers, err) -> { 3871 if (err != null) { 3872 future.completeExceptionally(err); 3873 return; 3874 } 3875 if (peers == null || peers.size() <= 0) { 3876 future.completeExceptionally( 3877 new IllegalArgumentException("Found no peer cluster for replication.")); 3878 return; 3879 } 3880 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3881 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3882 .forEach(peer -> { 3883 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3884 }); 3885 addListener( 3886 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3887 (result, err2) -> { 3888 if (err2 != null) { 3889 future.completeExceptionally(err2); 3890 } else { 3891 future.complete(result); 3892 } 3893 }); 3894 }); 3895 return future; 3896 } 3897 3898 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3899 ReplicationPeerDescription peer) { 3900 Configuration peerConf; 3901 try { 3902 peerConf = ReplicationPeerConfigUtil 3903 .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig()); 3904 } catch (IOException e) { 3905 return failedFuture(e); 3906 } 3907 URI connectionUri = 3908 ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey()); 3909 CompletableFuture<Void> future = new CompletableFuture<>(); 3910 addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> { 3911 if (err != null) { 3912 future.completeExceptionally(err); 3913 return; 3914 } 3915 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3916 if (err1 != null) { 3917 future.completeExceptionally(err1); 3918 return; 3919 } 3920 AsyncAdmin peerAdmin = conn.getAdmin(); 3921 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3922 if (err2 != null) { 3923 future.completeExceptionally(err2); 3924 return; 3925 } 3926 if (!exist) { 3927 CompletableFuture<Void> createTableFuture = null; 3928 if (splits == null) { 3929 createTableFuture = peerAdmin.createTable(tableDesc); 3930 } else { 3931 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3932 } 3933 addListener(createTableFuture, (result, err3) -> { 3934 if (err3 != null) { 3935 future.completeExceptionally(err3); 3936 } else { 3937 future.complete(result); 3938 } 3939 }); 3940 } else { 3941 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3942 (result, err4) -> { 3943 if (err4 != null) { 3944 future.completeExceptionally(err4); 3945 } else { 3946 future.complete(result); 3947 } 3948 }); 3949 } 3950 }); 3951 }); 3952 }); 3953 return future; 3954 } 3955 3956 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3957 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3958 CompletableFuture<Void> future = new CompletableFuture<>(); 3959 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3960 if (err != null) { 3961 future.completeExceptionally(err); 3962 return; 3963 } 3964 if (peerTableDesc == null) { 3965 future.completeExceptionally( 3966 new IllegalArgumentException("Failed to get table descriptor for table " 3967 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3968 return; 3969 } 3970 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3971 future.completeExceptionally(new IllegalArgumentException( 3972 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3973 + ", but the table descriptors are not same when compared with source cluster." 3974 + " Thus can not enable the table's replication switch.")); 3975 return; 3976 } 3977 future.complete(null); 3978 }); 3979 return future; 3980 } 3981 3982 /** 3983 * Set the table's replication switch if the table's replication switch is already not set. 3984 * @param tableName name of the table 3985 * @param enableRep is replication switch enable or disable 3986 */ 3987 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3988 CompletableFuture<Void> future = new CompletableFuture<>(); 3989 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3990 if (err != null) { 3991 future.completeExceptionally(err); 3992 return; 3993 } 3994 if (!tableDesc.matchReplicationScope(enableRep)) { 3995 int scope = 3996 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3997 TableDescriptor newTableDesc = 3998 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3999 addListener(modifyTable(newTableDesc), (result, err2) -> { 4000 if (err2 != null) { 4001 future.completeExceptionally(err2); 4002 } else { 4003 future.complete(result); 4004 } 4005 }); 4006 } else { 4007 future.complete(null); 4008 } 4009 }); 4010 return future; 4011 } 4012 4013 @Override 4014 public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) { 4015 GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); 4016 request.setPeerId(peerId); 4017 return this.<Boolean> newMasterCaller() 4018 .action((controller, stub) -> this.<GetReplicationPeerStateRequest, 4019 GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(), 4020 (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), 4021 resp -> resp.getIsEnabled())) 4022 .call(); 4023 } 4024 4025 private void waitUntilAllReplicationPeerModificationProceduresDone( 4026 CompletableFuture<Boolean> future, boolean prevOn, int retries) { 4027 CompletableFuture<List<ProcedureProtos.Procedure>> callFuture = 4028 this.<List<ProcedureProtos.Procedure>> newMasterCaller() 4029 .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest, 4030 GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call( 4031 controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), 4032 (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), 4033 resp -> resp.getProcedureList())) 4034 .call(); 4035 addListener(callFuture, (r, e) -> { 4036 if (e != null) { 4037 future.completeExceptionally(e); 4038 } else if (r.isEmpty()) { 4039 // we are done 4040 future.complete(prevOn); 4041 } else { 4042 // retry later to see if the procedures are done 4043 retryTimer.newTimeout( 4044 t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), 4045 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 4046 } 4047 }); 4048 } 4049 4050 @Override 4051 public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, 4052 boolean drainProcedures) { 4053 ReplicationPeerModificationSwitchRequest request = 4054 ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); 4055 CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller() 4056 .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest, 4057 ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request, 4058 (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), 4059 resp -> resp.getPreviousValue())) 4060 .call(); 4061 // if we do not need to wait all previous peer modification procedure done, or we are enabling 4062 // peer modification, just return here. 4063 if (!drainProcedures || on) { 4064 return callFuture; 4065 } 4066 // otherwise we need to wait until all previous peer modification procedure done 4067 CompletableFuture<Boolean> future = new CompletableFuture<>(); 4068 addListener(callFuture, (prevOn, err) -> { 4069 if (err != null) { 4070 future.completeExceptionally(err); 4071 return; 4072 } 4073 // even if the previous state is disabled, we still need to wait here, as there could be 4074 // another client thread which called this method just before us and have already changed the 4075 // state to off, but there are still peer modification procedures not finished, so we should 4076 // also wait here. 4077 waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); 4078 }); 4079 return future; 4080 } 4081 4082 @Override 4083 public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() { 4084 return this.<Boolean> newMasterCaller() 4085 .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest, 4086 IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub, 4087 IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), 4088 (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), 4089 (resp) -> resp.getEnabled())) 4090 .call(); 4091 } 4092 4093 @Override 4094 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 4095 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 4096 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 4097 if (err != null) { 4098 future.completeExceptionally(err); 4099 return; 4100 } 4101 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 4102 locations.stream().filter(l -> l.getRegion() != null) 4103 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 4104 .collect(Collectors.groupingBy(l -> l.getServerName(), 4105 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 4106 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 4107 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 4108 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 4109 futures 4110 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 4111 if (err2 != null) { 4112 future.completeExceptionally(unwrapCompletionException(err2)); 4113 } else { 4114 aggregator.append(stats); 4115 } 4116 })); 4117 } 4118 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 4119 (ret, err3) -> { 4120 if (err3 != null) { 4121 future.completeExceptionally(unwrapCompletionException(err3)); 4122 } else { 4123 future.complete(aggregator.sum()); 4124 } 4125 }); 4126 }); 4127 return future; 4128 } 4129 4130 @Override 4131 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 4132 boolean preserveSplits) { 4133 CompletableFuture<Void> future = new CompletableFuture<>(); 4134 addListener(tableExists(tableName), (exist, err) -> { 4135 if (err != null) { 4136 future.completeExceptionally(err); 4137 return; 4138 } 4139 if (!exist) { 4140 future.completeExceptionally(new TableNotFoundException(tableName)); 4141 return; 4142 } 4143 addListener(tableExists(newTableName), (exist1, err1) -> { 4144 if (err1 != null) { 4145 future.completeExceptionally(err1); 4146 return; 4147 } 4148 if (exist1) { 4149 future.completeExceptionally(new TableExistsException(newTableName)); 4150 return; 4151 } 4152 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 4153 if (err2 != null) { 4154 future.completeExceptionally(err2); 4155 return; 4156 } 4157 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 4158 if (preserveSplits) { 4159 addListener(getTableSplits(tableName), (splits, err3) -> { 4160 if (err3 != null) { 4161 future.completeExceptionally(err3); 4162 } else { 4163 addListener( 4164 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 4165 (result, err4) -> { 4166 if (err4 != null) { 4167 future.completeExceptionally(err4); 4168 } else { 4169 future.complete(result); 4170 } 4171 }); 4172 } 4173 }); 4174 } else { 4175 addListener(createTable(newTableDesc), (result, err5) -> { 4176 if (err5 != null) { 4177 future.completeExceptionally(err5); 4178 } else { 4179 future.complete(result); 4180 } 4181 }); 4182 } 4183 }); 4184 }); 4185 }); 4186 return future; 4187 } 4188 4189 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 4190 List<RegionInfo> hris) { 4191 return this.<CacheEvictionStats> newAdminCaller() 4192 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 4193 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 4194 RequestConverter.buildClearRegionBlockCacheRequest(hris), 4195 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 4196 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 4197 .serverName(serverName).call(); 4198 } 4199 4200 @Override 4201 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 4202 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4203 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 4204 Boolean> call(controller, stub, 4205 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 4206 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 4207 resp -> resp.getPreviousRpcThrottleEnabled())) 4208 .call(); 4209 return future; 4210 } 4211 4212 @Override 4213 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 4214 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4215 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 4216 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 4217 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 4218 resp -> resp.getRpcThrottleEnabled())) 4219 .call(); 4220 return future; 4221 } 4222 4223 @Override 4224 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 4225 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 4226 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 4227 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 4228 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 4229 .build(), 4230 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 4231 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 4232 .call(); 4233 return future; 4234 } 4235 4236 @Override 4237 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 4238 return this.<Map<TableName, Long>> newMasterCaller() 4239 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 4240 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 4241 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 4242 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 4243 resp -> resp.getSizesList().stream().collect(Collectors 4244 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 4245 .call(); 4246 } 4247 4248 @Override 4249 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 4250 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 4251 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 4252 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 4253 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 4254 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 4255 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 4256 resp -> resp.getSnapshotsList().stream() 4257 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 4258 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 4259 .serverName(serverName).call(); 4260 } 4261 4262 private CompletableFuture<SpaceQuotaSnapshot> 4263 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 4264 return this.<SpaceQuotaSnapshot> newMasterCaller() 4265 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 4266 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 4267 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 4268 .call(); 4269 } 4270 4271 @Override 4272 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 4273 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 4274 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 4275 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4276 } 4277 4278 @Override 4279 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 4280 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 4281 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 4282 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 4283 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 4284 } 4285 4286 @Override 4287 public CompletableFuture<Void> grant(UserPermission userPermission, 4288 boolean mergeExistingPermissions) { 4289 return this.<Void> newMasterCaller() 4290 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 4291 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 4292 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 4293 .call(); 4294 } 4295 4296 @Override 4297 public CompletableFuture<Void> revoke(UserPermission userPermission) { 4298 return this.<Void> newMasterCaller() 4299 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 4300 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 4301 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 4302 .call(); 4303 } 4304 4305 @Override 4306 public CompletableFuture<List<UserPermission>> 4307 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 4308 return this.<List<UserPermission>> newMasterCaller() 4309 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 4310 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 4311 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 4312 (s, c, req, done) -> s.getUserPermissions(c, req, done), 4313 resp -> resp.getUserPermissionList().stream() 4314 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 4315 .collect(Collectors.toList()))) 4316 .call(); 4317 } 4318 4319 @Override 4320 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 4321 List<Permission> permissions) { 4322 return this.<List<Boolean>> newMasterCaller() 4323 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 4324 List<Boolean>> call(controller, stub, 4325 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 4326 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 4327 resp -> resp.getHasUserPermissionList())) 4328 .call(); 4329 } 4330 4331 @Override 4332 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 4333 return this.<Boolean> newMasterCaller() 4334 .action((controller, stub) -> this.call(controller, stub, 4335 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 4336 MasterService.Interface::switchSnapshotCleanup, 4337 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 4338 .call(); 4339 } 4340 4341 @Override 4342 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 4343 return this.<Boolean> newMasterCaller() 4344 .action((controller, stub) -> this.call(controller, stub, 4345 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 4346 MasterService.Interface::isSnapshotCleanupEnabled, 4347 IsSnapshotCleanupEnabledResponse::getEnabled)) 4348 .call(); 4349 } 4350 4351 @Override 4352 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) { 4353 return this.<Void> newMasterCaller() 4354 .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call( 4355 controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName), 4356 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null)) 4357 .call(); 4358 } 4359 4360 @Override 4361 public CompletableFuture<Void> addRSGroup(String groupName) { 4362 return this.<Void> newMasterCaller() 4363 .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call( 4364 controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4365 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)) 4366 .call(); 4367 } 4368 4369 @Override 4370 public CompletableFuture<Void> removeRSGroup(String groupName) { 4371 return this.<Void> newMasterCaller() 4372 .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call( 4373 controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(), 4374 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null)) 4375 .call(); 4376 } 4377 4378 @Override 4379 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName, 4380 BalanceRequest request) { 4381 return this.<BalanceResponse> newMasterCaller() 4382 .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, 4383 BalanceResponse> call(controller, stub, 4384 ProtobufUtil.createBalanceRSGroupRequest(groupName, request), 4385 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse)) 4386 .call(); 4387 } 4388 4389 @Override 4390 public CompletableFuture<List<RSGroupInfo>> listRSGroups() { 4391 return this.<List<RSGroupInfo>> newMasterCaller() 4392 .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse, 4393 List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(), 4394 (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList() 4395 .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList()))) 4396 .call(); 4397 } 4398 4399 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 4400 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 4401 final String logType) { 4402 if (CollectionUtils.isEmpty(serverNames)) { 4403 return CompletableFuture.completedFuture(Collections.emptyList()); 4404 } 4405 return CompletableFuture.supplyAsync(() -> serverNames 4406 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 4407 filterParams, limit, logType)) 4408 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 4409 } 4410 4411 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 4412 Map<String, Object> filterParams, int limit, String logType) { 4413 return this.<List<LogEntry>> newAdminCaller() 4414 .action((controller, stub) -> this.adminCall(controller, stub, 4415 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 4416 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 4417 .serverName(serverName).call(); 4418 } 4419 4420 @Override 4421 public CompletableFuture<List<Boolean>> 4422 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 4423 if (CollectionUtils.isEmpty(serverNames)) { 4424 return CompletableFuture.completedFuture(Collections.emptyList()); 4425 } 4426 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 4427 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 4428 return convertToFutureOfList(clearSlowLogResponseList); 4429 } 4430 4431 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 4432 return this.<Boolean> newAdminCaller() 4433 .action((controller, stub) -> this.adminCall(controller, stub, 4434 RequestConverter.buildClearSlowLogResponseRequest(), 4435 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 4436 .serverName(serverName).call(); 4437 } 4438 4439 private static <T> CompletableFuture<List<T>> 4440 convertToFutureOfList(List<CompletableFuture<T>> futures) { 4441 CompletableFuture<Void> allDoneFuture = 4442 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 4443 return allDoneFuture 4444 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 4445 } 4446 4447 @Override 4448 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) { 4449 return this.<List<TableName>> newMasterCaller() 4450 .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, 4451 List<TableName>> call(controller, stub, 4452 ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(), 4453 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList() 4454 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()))) 4455 .call(); 4456 } 4457 4458 @Override 4459 public CompletableFuture<Pair<List<String>, List<TableName>>> 4460 getConfiguredNamespacesAndTablesInRSGroup(String groupName) { 4461 return this.<Pair<List<String>, List<TableName>>> newMasterCaller() 4462 .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest, 4463 GetConfiguredNamespacesAndTablesInRSGroupResponse, 4464 Pair<List<String>, List<TableName>>> call(controller, stub, 4465 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName) 4466 .build(), 4467 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done), 4468 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream() 4469 .map(ProtobufUtil::toTableName).collect(Collectors.toList())))) 4470 .call(); 4471 } 4472 4473 @Override 4474 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) { 4475 return this.<RSGroupInfo> newMasterCaller() 4476 .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest, 4477 GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub, 4478 GetRSGroupInfoOfServerRequest.newBuilder() 4479 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname()) 4480 .setPort(hostPort.getPort()).build()) 4481 .build(), 4482 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done), 4483 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4484 .call(); 4485 } 4486 4487 @Override 4488 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) { 4489 return this.<Void> newMasterCaller() 4490 .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call( 4491 controller, stub, RequestConverter.buildRemoveServersRequest(servers), 4492 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null)) 4493 .call(); 4494 } 4495 4496 @Override 4497 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) { 4498 CompletableFuture<Void> future = new CompletableFuture<>(); 4499 for (TableName tableName : tables) { 4500 addListener(tableExists(tableName), (exist, err) -> { 4501 if (err != null) { 4502 future.completeExceptionally(err); 4503 return; 4504 } 4505 if (!exist) { 4506 future.completeExceptionally(new TableNotFoundException(tableName)); 4507 return; 4508 } 4509 }); 4510 } 4511 addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> { 4512 if (err != null) { 4513 future.completeExceptionally(err); 4514 return; 4515 } 4516 if (tableDescriptions == null || tableDescriptions.isEmpty()) { 4517 future.complete(null); 4518 return; 4519 } 4520 List<TableDescriptor> newTableDescriptors = new ArrayList<>(); 4521 for (TableDescriptor td : tableDescriptions) { 4522 newTableDescriptors 4523 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build()); 4524 } 4525 addListener( 4526 CompletableFuture.allOf( 4527 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)), 4528 (v, e) -> { 4529 if (e != null) { 4530 future.completeExceptionally(e); 4531 } else { 4532 future.complete(v); 4533 } 4534 }); 4535 }); 4536 return future; 4537 } 4538 4539 @Override 4540 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) { 4541 return this.<RSGroupInfo> newMasterCaller() 4542 .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest, 4543 GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub, 4544 GetRSGroupInfoOfTableRequest.newBuilder() 4545 .setTableName(ProtobufUtil.toProtoTableName(table)).build(), 4546 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done), 4547 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4548 .call(); 4549 } 4550 4551 @Override 4552 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) { 4553 return this.<RSGroupInfo> newMasterCaller() 4554 .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse, 4555 RSGroupInfo> call(controller, stub, 4556 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(), 4557 (s, c, req, done) -> s.getRSGroupInfo(c, req, done), 4558 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)) 4559 .call(); 4560 } 4561 4562 @Override 4563 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) { 4564 return this.<Void> newMasterCaller() 4565 .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call( 4566 controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName) 4567 .setNewRsgroupName(newName).build(), 4568 (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null)) 4569 .call(); 4570 } 4571 4572 @Override 4573 public CompletableFuture<Void> updateRSGroupConfig(String groupName, 4574 Map<String, String> configuration) { 4575 UpdateRSGroupConfigRequest.Builder request = 4576 UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName); 4577 if (configuration != null) { 4578 configuration.entrySet().forEach(e -> request.addConfiguration( 4579 NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build())); 4580 } 4581 return this.<Void> newMasterCaller() 4582 .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, 4583 Void> call(controller, stub, request.build(), 4584 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null)) 4585 .call(); 4586 } 4587 4588 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 4589 return this.<List<LogEntry>> newMasterCaller() 4590 .action((controller, stub) -> this.call(controller, stub, 4591 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 4592 ProtobufUtil::toBalancerDecisionResponse)) 4593 .call(); 4594 } 4595 4596 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 4597 return this.<List<LogEntry>> newMasterCaller() 4598 .action((controller, stub) -> this.call(controller, stub, 4599 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 4600 ProtobufUtil::toBalancerRejectionResponse)) 4601 .call(); 4602 } 4603 4604 @Override 4605 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 4606 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 4607 if (logType == null || serverType == null) { 4608 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 4609 } 4610 switch (logType) { 4611 case "SLOW_LOG": 4612 case "LARGE_LOG": 4613 if (ServerType.MASTER.equals(serverType)) { 4614 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 4615 } 4616 return getSlowLogResponses(filterParams, serverNames, limit, logType); 4617 case "BALANCER_DECISION": 4618 if (ServerType.REGION_SERVER.equals(serverType)) { 4619 throw new IllegalArgumentException( 4620 "Balancer Decision logs are not maintained by HRegionServer"); 4621 } 4622 return getBalancerDecisions(limit); 4623 case "BALANCER_REJECTION": 4624 if (ServerType.REGION_SERVER.equals(serverType)) { 4625 throw new IllegalArgumentException( 4626 "Balancer Rejection logs are not maintained by HRegionServer"); 4627 } 4628 return getBalancerRejections(limit); 4629 default: 4630 return CompletableFuture.completedFuture(Collections.emptyList()); 4631 } 4632 } 4633 4634 @Override 4635 public CompletableFuture<Void> flushMasterStore() { 4636 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4637 return this.<Void> newMasterCaller() 4638 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4639 Void> call(controller, stub, request.build(), 4640 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4641 .call(); 4642 } 4643 4644 @Override 4645 public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) { 4646 GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder(); 4647 return this.<List<String>> newAdminCaller() 4648 .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse, 4649 List<String>> adminCall(controller, stub, request.build(), 4650 (s, c, req, done) -> s.getCachedFilesList(c, req, done), 4651 resp -> resp.getCachedFilesList())) 4652 .serverName(serverName).call(); 4653 } 4654 4655 @Override 4656 public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) { 4657 MasterProtos.RestoreBackupSystemTableRequest request = 4658 MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName) 4659 .build(); 4660 return this.<MasterProtos.RestoreBackupSystemTableRequest, 4661 MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request, 4662 MasterService.Interface::restoreBackupSystemTable, 4663 MasterProtos.RestoreBackupSystemTableResponse::getProcId, 4664 new RestoreBackupSystemTableProcedureBiConsumer()); 4665 } 4666}