001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 024 025import com.google.protobuf.Message; 026import com.google.protobuf.RpcChannel; 027import edu.umd.cs.findbugs.annotations.Nullable; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.EnumSet; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.Set; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentLinkedQueue; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicReference; 043import java.util.function.BiConsumer; 044import java.util.function.Consumer; 045import java.util.function.Function; 046import java.util.function.Supplier; 047import java.util.regex.Pattern; 048import java.util.stream.Collectors; 049import java.util.stream.Stream; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.AsyncMetaTableAccessor; 052import org.apache.hadoop.hbase.CacheEvictionStats; 053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.ClusterMetricsBuilder; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionLocation; 059import org.apache.hadoop.hbase.MetaTableAccessor; 060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; 061import org.apache.hadoop.hbase.NamespaceDescriptor; 062import org.apache.hadoop.hbase.RegionLocations; 063import org.apache.hadoop.hbase.RegionMetrics; 064import org.apache.hadoop.hbase.RegionMetricsBuilder; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableExistsException; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.TableNotDisabledException; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.UnknownRegionException; 072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 075import org.apache.hadoop.hbase.client.Scan.ReadType; 076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 077import org.apache.hadoop.hbase.client.replication.TableCFs; 078import org.apache.hadoop.hbase.client.security.SecurityCapability; 079import org.apache.hadoop.hbase.exceptions.DeserializationException; 080import org.apache.hadoop.hbase.ipc.HBaseRpcController; 081import org.apache.hadoop.hbase.quotas.QuotaFilter; 082import org.apache.hadoop.hbase.quotas.QuotaSettings; 083import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 085import org.apache.hadoop.hbase.replication.ReplicationException; 086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest; 089import org.apache.hadoop.hbase.security.access.Permission; 090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; 091import org.apache.hadoop.hbase.security.access.UserPermission; 092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 098import org.apache.yetus.audience.InterfaceAudience; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 103import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 104import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 105import org.apache.hbase.thirdparty.io.netty.util.Timeout; 106import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 107import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 108 109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 110import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; 288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 300import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 301import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 302import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 303import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 304import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 305 306/** 307 * The implementation of AsyncAdmin. 308 * <p> 309 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 310 * be finished inside the rpc framework thread, which means that the callbacks registered to the 311 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 312 * this class should not try to do time consuming tasks in the callbacks. 313 * @since 2.0.0 314 * @see AsyncHBaseAdmin 315 * @see AsyncConnection#getAdmin() 316 * @see AsyncConnection#getAdminBuilder() 317 */ 318@InterfaceAudience.Private 319class RawAsyncHBaseAdmin implements AsyncAdmin { 320 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 321 322 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 323 324 private final AsyncConnectionImpl connection; 325 326 private final HashedWheelTimer retryTimer; 327 328 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 329 330 private final long rpcTimeoutNs; 331 332 private final long operationTimeoutNs; 333 334 private final long pauseNs; 335 336 private final long pauseNsForServerOverloaded; 337 338 private final int maxAttempts; 339 340 private final int startLogErrorsCnt; 341 342 private final NonceGenerator ng; 343 344 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 345 AsyncAdminBuilderBase builder) { 346 this.connection = connection; 347 this.retryTimer = retryTimer; 348 this.metaTable = connection.getTable(META_TABLE_NAME); 349 this.rpcTimeoutNs = builder.rpcTimeoutNs; 350 this.operationTimeoutNs = builder.operationTimeoutNs; 351 this.pauseNs = builder.pauseNs; 352 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 353 LOG.warn( 354 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 355 + " the normal pause value {} ms, use the greater one instead", 356 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 357 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 358 this.pauseNsForServerOverloaded = builder.pauseNs; 359 } else { 360 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 361 } 362 this.maxAttempts = builder.maxAttempts; 363 this.startLogErrorsCnt = builder.startLogErrorsCnt; 364 this.ng = connection.getNonceGenerator(); 365 } 366 367 private <T> MasterRequestCallerBuilder<T> newMasterCaller() { 368 return this.connection.callerFactory.<T> masterRequest() 369 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 370 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 371 .pause(pauseNs, TimeUnit.NANOSECONDS) 372 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 373 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 374 } 375 376 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 377 return this.connection.callerFactory.<T> adminRequest() 378 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 379 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 380 .pause(pauseNs, TimeUnit.NANOSECONDS) 381 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 382 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 383 } 384 385 @FunctionalInterface 386 private interface MasterRpcCall<RESP, REQ> { 387 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 388 RpcCallback<RESP> done); 389 } 390 391 @FunctionalInterface 392 private interface AdminRpcCall<RESP, REQ> { 393 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 394 RpcCallback<RESP> done); 395 } 396 397 @FunctionalInterface 398 private interface Converter<D, S> { 399 D convert(S src) throws IOException; 400 } 401 402 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 403 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 404 Converter<RESP, PRESP> respConverter) { 405 CompletableFuture<RESP> future = new CompletableFuture<>(); 406 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 407 408 @Override 409 public void run(PRESP resp) { 410 if (controller.failed()) { 411 future.completeExceptionally(controller.getFailed()); 412 } else { 413 try { 414 future.complete(respConverter.convert(resp)); 415 } catch (IOException e) { 416 future.completeExceptionally(e); 417 } 418 } 419 } 420 }); 421 return future; 422 } 423 424 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 425 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 426 Converter<RESP, PRESP> respConverter) { 427 CompletableFuture<RESP> future = new CompletableFuture<>(); 428 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 429 430 @Override 431 public void run(PRESP resp) { 432 if (controller.failed()) { 433 future.completeExceptionally(controller.getFailed()); 434 } else { 435 try { 436 future.complete(respConverter.convert(resp)); 437 } catch (IOException e) { 438 future.completeExceptionally(e); 439 } 440 } 441 } 442 }); 443 return future; 444 } 445 446 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 447 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 448 ProcedureBiConsumer consumer) { 449 return procedureCall(b -> { 450 }, preq, rpcCall, respConverter, consumer); 451 } 452 453 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, 454 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 455 ProcedureBiConsumer consumer) { 456 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); 457 } 458 459 private <PREQ, PRESP> CompletableFuture<Void> procedureCall( 460 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, 461 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 462 ProcedureBiConsumer consumer) { 463 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, 464 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); 465 prioritySetter.accept(builder); 466 CompletableFuture<Long> procFuture = builder.call(); 467 CompletableFuture<Void> future = waitProcedureResult(procFuture); 468 addListener(future, consumer); 469 return future; 470 } 471 472 @Override 473 public CompletableFuture<Boolean> tableExists(TableName tableName) { 474 if (TableName.isMetaTableName(tableName)) { 475 return CompletableFuture.completedFuture(true); 476 } 477 return AsyncMetaTableAccessor.tableExists(metaTable, tableName); 478 } 479 480 @Override 481 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 482 return getTableDescriptors( 483 RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables)); 484 } 485 486 /** 487 * {@link #listTableDescriptors(boolean)} 488 */ 489 @Override 490 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 491 boolean includeSysTables) { 492 Preconditions.checkNotNull(pattern, 493 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead"); 494 return getTableDescriptors( 495 RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables)); 496 } 497 498 @Override 499 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) { 500 Preconditions.checkNotNull(tableNames, 501 "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead"); 502 if (tableNames.isEmpty()) { 503 return CompletableFuture.completedFuture(Collections.emptyList()); 504 } 505 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames)); 506 } 507 508 private CompletableFuture<List<TableDescriptor>> 509 getTableDescriptors(GetTableDescriptorsRequest request) { 510 return this.<List<TableDescriptor>> newMasterCaller() 511 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 512 List<TableDescriptor>> call(controller, stub, request, 513 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 514 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 515 .call(); 516 } 517 518 @Override 519 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 520 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 521 } 522 523 @Override 524 public CompletableFuture<List<TableName>> listTableNames(Pattern pattern, 525 boolean includeSysTables) { 526 Preconditions.checkNotNull(pattern, 527 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 528 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 529 } 530 531 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 532 return this.<List<TableName>> newMasterCaller() 533 .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse, 534 List<TableName>> call(controller, stub, request, 535 (s, c, req, done) -> s.getTableNames(c, req, done), 536 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) 537 .call(); 538 } 539 540 @Override 541 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 542 return this.<List<TableDescriptor>> newMasterCaller() 543 .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest, 544 ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub, 545 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 546 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 547 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 548 .call(); 549 } 550 551 @Override 552 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 553 return this.<List<TableName>> newMasterCaller() 554 .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest, 555 ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub, 556 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 557 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 558 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 559 .call(); 560 } 561 562 @Override 563 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 564 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 565 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName) 566 .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 567 List<TableSchema>> call(controller, stub, 568 RequestConverter.buildGetTableDescriptorsRequest(tableName), 569 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 570 (resp) -> resp.getTableSchemaList())) 571 .call(), (tableSchemas, error) -> { 572 if (error != null) { 573 future.completeExceptionally(error); 574 return; 575 } 576 if (!tableSchemas.isEmpty()) { 577 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 578 } else { 579 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 580 } 581 }); 582 return future; 583 } 584 585 @Override 586 public CompletableFuture<Void> createTable(TableDescriptor desc) { 587 return createTable(desc.getTableName(), 588 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 589 } 590 591 @Override 592 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 593 int numRegions) { 594 try { 595 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 596 } catch (IllegalArgumentException e) { 597 return failedFuture(e); 598 } 599 } 600 601 @Override 602 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 603 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 604 + " use createTable(TableDescriptor) instead"); 605 try { 606 verifySplitKeys(splitKeys); 607 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 608 splitKeys, ng.getNonceGroup(), ng.newNonce())); 609 } catch (IllegalArgumentException e) { 610 return failedFuture(e); 611 } 612 } 613 614 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 615 Preconditions.checkNotNull(tableName, "table name is null"); 616 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request, 617 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 618 new CreateTableProcedureBiConsumer(tableName)); 619 } 620 621 @Override 622 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 623 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(), 624 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 625 ng.newNonce()), 626 (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), 627 new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 628 } 629 630 @Override 631 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) { 632 return this.<ModifyTableStoreFileTrackerRequest, 633 ModifyTableStoreFileTrackerResponse> procedureCall(tableName, 634 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT, 635 ng.getNonceGroup(), ng.newNonce()), 636 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done), 637 (resp) -> resp.getProcId(), 638 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName)); 639 } 640 641 @Override 642 public CompletableFuture<Void> deleteTable(TableName tableName) { 643 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName, 644 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 645 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 646 new DeleteTableProcedureBiConsumer(tableName)); 647 } 648 649 @Override 650 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 651 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName, 652 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 653 ng.newNonce()), 654 (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), 655 new TruncateTableProcedureBiConsumer(tableName)); 656 } 657 658 @Override 659 public CompletableFuture<Void> enableTable(TableName tableName) { 660 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName, 661 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 662 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 663 new EnableTableProcedureBiConsumer(tableName)); 664 } 665 666 @Override 667 public CompletableFuture<Void> disableTable(TableName tableName) { 668 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName, 669 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 670 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 671 new DisableTableProcedureBiConsumer(tableName)); 672 } 673 674 /** 675 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using 676 * passed parameters. Sets error or boolean result ('true' if table matches the passed-in 677 * targetState). 678 */ 679 private static CompletableFuture<Boolean> completeCheckTableState( 680 CompletableFuture<Boolean> future, TableState tableState, Throwable error, 681 TableState.State targetState, TableName tableName) { 682 if (error != null) { 683 future.completeExceptionally(error); 684 } else { 685 if (tableState != null) { 686 future.complete(tableState.inStates(targetState)); 687 } else { 688 future.completeExceptionally(new TableNotFoundException(tableName)); 689 } 690 } 691 return future; 692 } 693 694 @Override 695 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 696 if (TableName.isMetaTableName(tableName)) { 697 return CompletableFuture.completedFuture(true); 698 } 699 CompletableFuture<Boolean> future = new CompletableFuture<>(); 700 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 701 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 702 TableState.State.ENABLED, tableName); 703 }); 704 return future; 705 } 706 707 @Override 708 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 709 if (TableName.isMetaTableName(tableName)) { 710 return CompletableFuture.completedFuture(false); 711 } 712 CompletableFuture<Boolean> future = new CompletableFuture<>(); 713 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> { 714 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error, 715 TableState.State.DISABLED, tableName); 716 }); 717 return future; 718 } 719 720 @Override 721 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 722 return isTableAvailable(tableName, Optional.empty()); 723 } 724 725 @Override 726 public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { 727 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 728 + " use isTableAvailable(TableName) instead"); 729 return isTableAvailable(tableName, Optional.of(splitKeys)); 730 } 731 732 private CompletableFuture<Boolean> isTableAvailable(TableName tableName, 733 Optional<byte[][]> splitKeys) { 734 if (TableName.isMetaTableName(tableName)) { 735 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream 736 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 737 } 738 CompletableFuture<Boolean> future = new CompletableFuture<>(); 739 addListener(isTableEnabled(tableName), (enabled, error) -> { 740 if (error != null) { 741 if (error instanceof TableNotFoundException) { 742 future.complete(false); 743 } else { 744 future.completeExceptionally(error); 745 } 746 return; 747 } 748 if (!enabled) { 749 future.complete(false); 750 } else { 751 addListener(AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 752 (locations, error1) -> { 753 if (error1 != null) { 754 future.completeExceptionally(error1); 755 return; 756 } 757 List<HRegionLocation> notDeployedRegions = locations.stream() 758 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 759 if (notDeployedRegions.size() > 0) { 760 if (LOG.isDebugEnabled()) { 761 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 762 } 763 future.complete(false); 764 return; 765 } 766 767 Optional<Boolean> available = 768 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); 769 future.complete(available.orElse(true)); 770 }); 771 } 772 }); 773 return future; 774 } 775 776 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) { 777 int regionCount = 0; 778 for (HRegionLocation location : locations) { 779 RegionInfo info = location.getRegion(); 780 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 781 regionCount++; 782 continue; 783 } 784 for (byte[] splitKey : splitKeys) { 785 // Just check if the splitkey is available 786 if (Bytes.equals(info.getStartKey(), splitKey)) { 787 regionCount++; 788 break; 789 } 790 } 791 } 792 return regionCount == splitKeys.length + 1; 793 } 794 795 @Override 796 public CompletableFuture<Void> addColumnFamily(TableName tableName, 797 ColumnFamilyDescriptor columnFamily) { 798 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName, 799 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 800 ng.newNonce()), 801 (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 802 new AddColumnFamilyProcedureBiConsumer(tableName)); 803 } 804 805 @Override 806 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 807 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName, 808 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 809 ng.newNonce()), 810 (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), 811 new DeleteColumnFamilyProcedureBiConsumer(tableName)); 812 } 813 814 @Override 815 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 816 ColumnFamilyDescriptor columnFamily) { 817 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName, 818 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 819 ng.newNonce()), 820 (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), 821 new ModifyColumnFamilyProcedureBiConsumer(tableName)); 822 } 823 824 @Override 825 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName, 826 byte[] family, String dstSFT) { 827 return this.<ModifyColumnStoreFileTrackerRequest, 828 ModifyColumnStoreFileTrackerResponse> procedureCall(tableName, 829 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT, 830 ng.getNonceGroup(), ng.newNonce()), 831 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done), 832 (resp) -> resp.getProcId(), 833 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName)); 834 } 835 836 @Override 837 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 838 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 839 RequestConverter.buildCreateNamespaceRequest(descriptor), 840 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 841 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 842 } 843 844 @Override 845 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 846 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 847 RequestConverter.buildModifyNamespaceRequest(descriptor), 848 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 849 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 850 } 851 852 @Override 853 public CompletableFuture<Void> deleteNamespace(String name) { 854 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 855 RequestConverter.buildDeleteNamespaceRequest(name), 856 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 857 new DeleteNamespaceProcedureBiConsumer(name)); 858 } 859 860 @Override 861 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 862 return this.<NamespaceDescriptor> newMasterCaller() 863 .action((controller, stub) -> this.<GetNamespaceDescriptorRequest, 864 GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub, 865 RequestConverter.buildGetNamespaceDescriptorRequest(name), 866 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), 867 (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))) 868 .call(); 869 } 870 871 @Override 872 public CompletableFuture<List<String>> listNamespaces() { 873 return this.<List<String>> newMasterCaller() 874 .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse, 875 List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(), 876 (s, c, req, done) -> s.listNamespaces(c, req, done), 877 (resp) -> resp.getNamespaceNameList())) 878 .call(); 879 } 880 881 @Override 882 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 883 return this.<List<NamespaceDescriptor>> newMasterCaller() 884 .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest, 885 ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub, 886 ListNamespaceDescriptorsRequest.newBuilder().build(), 887 (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), 888 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))) 889 .call(); 890 } 891 892 @Override 893 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 894 return this.<List<RegionInfo>> newAdminCaller() 895 .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse, 896 List<RegionInfo>> adminCall(controller, stub, 897 RequestConverter.buildGetOnlineRegionRequest(), 898 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 899 resp -> ProtobufUtil.getRegionInfos(resp))) 900 .serverName(serverName).call(); 901 } 902 903 @Override 904 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 905 if (tableName.equals(META_TABLE_NAME)) { 906 return connection.registry.getMetaRegionLocations() 907 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion) 908 .collect(Collectors.toList())); 909 } else { 910 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply( 911 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 912 } 913 } 914 915 @Override 916 public CompletableFuture<Void> flush(TableName tableName) { 917 return flush(tableName, null); 918 } 919 920 @Override 921 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { 922 CompletableFuture<Void> future = new CompletableFuture<>(); 923 addListener(tableExists(tableName), (exists, err) -> { 924 if (err != null) { 925 future.completeExceptionally(err); 926 } else if (!exists) { 927 future.completeExceptionally(new TableNotFoundException(tableName)); 928 } else { 929 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 930 if (err2 != null) { 931 future.completeExceptionally(err2); 932 } else if (!tableEnabled) { 933 future.completeExceptionally(new TableNotEnabledException(tableName)); 934 } else { 935 Map<String, String> props = new HashMap<>(); 936 if (columnFamily != null) { 937 props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); 938 } 939 addListener( 940 execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), 941 (ret, err3) -> { 942 if (err3 != null) { 943 future.completeExceptionally(err3); 944 } else { 945 future.complete(ret); 946 } 947 }); 948 } 949 }); 950 } 951 }); 952 return future; 953 } 954 955 @Override 956 public CompletableFuture<Void> flushRegion(byte[] regionName) { 957 return flushRegion(regionName, null); 958 } 959 960 @Override 961 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) { 962 CompletableFuture<Void> future = new CompletableFuture<>(); 963 addListener(getRegionLocation(regionName), (location, err) -> { 964 if (err != null) { 965 future.completeExceptionally(err); 966 return; 967 } 968 ServerName serverName = location.getServerName(); 969 if (serverName == null) { 970 future 971 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 972 return; 973 } 974 addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> { 975 if (err2 != null) { 976 future.completeExceptionally(err2); 977 } else { 978 future.complete(ret); 979 } 980 }); 981 }); 982 return future; 983 } 984 985 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo, 986 byte[] columnFamily) { 987 return this.<Void> newAdminCaller().serverName(serverName) 988 .action( 989 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, 990 Void> adminCall(controller, stub, RequestConverter 991 .buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily, false), 992 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null)) 993 .call(); 994 } 995 996 @Override 997 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 998 CompletableFuture<Void> future = new CompletableFuture<>(); 999 addListener(getRegions(sn), (hRegionInfos, err) -> { 1000 if (err != null) { 1001 future.completeExceptionally(err); 1002 return; 1003 } 1004 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1005 if (hRegionInfos != null) { 1006 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null))); 1007 } 1008 addListener(CompletableFuture.allOf( 1009 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1010 if (err2 != null) { 1011 future.completeExceptionally(err2); 1012 } else { 1013 future.complete(ret); 1014 } 1015 }); 1016 }); 1017 return future; 1018 } 1019 1020 @Override 1021 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 1022 return compact(tableName, null, false, compactType); 1023 } 1024 1025 @Override 1026 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 1027 CompactType compactType) { 1028 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 1029 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1030 return compact(tableName, columnFamily, false, compactType); 1031 } 1032 1033 @Override 1034 public CompletableFuture<Void> compactRegion(byte[] regionName) { 1035 return compactRegion(regionName, null, false); 1036 } 1037 1038 @Override 1039 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 1040 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1041 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 1042 return compactRegion(regionName, columnFamily, false); 1043 } 1044 1045 @Override 1046 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 1047 return compact(tableName, null, true, compactType); 1048 } 1049 1050 @Override 1051 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 1052 CompactType compactType) { 1053 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1054 + "If you don't specify a columnFamily, use compact(TableName) instead"); 1055 return compact(tableName, columnFamily, true, compactType); 1056 } 1057 1058 @Override 1059 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 1060 return compactRegion(regionName, null, true); 1061 } 1062 1063 @Override 1064 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 1065 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 1066 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 1067 return compactRegion(regionName, columnFamily, true); 1068 } 1069 1070 @Override 1071 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 1072 return compactRegionServer(sn, false); 1073 } 1074 1075 @Override 1076 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 1077 return compactRegionServer(sn, true); 1078 } 1079 1080 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 1081 CompletableFuture<Void> future = new CompletableFuture<>(); 1082 addListener(getRegions(sn), (hRegionInfos, err) -> { 1083 if (err != null) { 1084 future.completeExceptionally(err); 1085 return; 1086 } 1087 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 1088 if (hRegionInfos != null) { 1089 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 1090 } 1091 addListener(CompletableFuture.allOf( 1092 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 1093 if (err2 != null) { 1094 future.completeExceptionally(err2); 1095 } else { 1096 future.complete(ret); 1097 } 1098 }); 1099 }); 1100 return future; 1101 } 1102 1103 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 1104 boolean major) { 1105 CompletableFuture<Void> future = new CompletableFuture<>(); 1106 addListener(getRegionLocation(regionName), (location, err) -> { 1107 if (err != null) { 1108 future.completeExceptionally(err); 1109 return; 1110 } 1111 ServerName serverName = location.getServerName(); 1112 if (serverName == null) { 1113 future 1114 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1115 return; 1116 } 1117 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 1118 (ret, err2) -> { 1119 if (err2 != null) { 1120 future.completeExceptionally(err2); 1121 } else { 1122 future.complete(ret); 1123 } 1124 }); 1125 }); 1126 return future; 1127 } 1128 1129 /** 1130 * List all region locations for the specific table. 1131 */ 1132 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1133 if (TableName.META_TABLE_NAME.equals(tableName)) { 1134 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1135 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> { 1136 if (err != null) { 1137 future.completeExceptionally(err); 1138 } else if ( 1139 metaRegions == null || metaRegions.isEmpty() 1140 || metaRegions.getDefaultRegionLocation() == null 1141 ) { 1142 future.completeExceptionally(new IOException("meta region does not found")); 1143 } else { 1144 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1145 } 1146 }); 1147 return future; 1148 } else { 1149 // For non-meta table, we fetch all locations by scanning hbase:meta table 1150 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1151 } 1152 } 1153 1154 /** 1155 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1156 */ 1157 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1158 CompactType compactType) { 1159 CompletableFuture<Void> future = new CompletableFuture<>(); 1160 1161 switch (compactType) { 1162 case MOB: 1163 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 1164 if (err != null) { 1165 future.completeExceptionally(err); 1166 return; 1167 } 1168 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1169 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1170 if (err2 != null) { 1171 future.completeExceptionally(err2); 1172 } else { 1173 future.complete(ret); 1174 } 1175 }); 1176 }); 1177 break; 1178 case NORMAL: 1179 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1180 if (err != null) { 1181 future.completeExceptionally(err); 1182 return; 1183 } 1184 if (locations == null || locations.isEmpty()) { 1185 future.completeExceptionally(new TableNotFoundException(tableName)); 1186 } 1187 CompletableFuture<?>[] compactFutures = 1188 locations.stream().filter(l -> l.getRegion() != null) 1189 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1190 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1191 .toArray(CompletableFuture<?>[]::new); 1192 // future complete unless all of the compact futures are completed. 1193 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1194 if (err2 != null) { 1195 future.completeExceptionally(err2); 1196 } else { 1197 future.complete(ret); 1198 } 1199 }); 1200 }); 1201 break; 1202 default: 1203 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1204 } 1205 return future; 1206 } 1207 1208 /** 1209 * Compact the region at specific region server. 1210 */ 1211 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1212 final boolean major, byte[] columnFamily) { 1213 return this.<Void> newAdminCaller().serverName(sn) 1214 .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, 1215 Void> adminCall(controller, stub, 1216 RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily), 1217 (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) 1218 .call(); 1219 } 1220 1221 private byte[] toEncodeRegionName(byte[] regionName) { 1222 return RegionInfo.isEncodedRegionName(regionName) 1223 ? regionName 1224 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1225 } 1226 1227 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1228 CompletableFuture<TableName> result) { 1229 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1230 if (err != null) { 1231 result.completeExceptionally(err); 1232 return; 1233 } 1234 RegionInfo regionInfo = location.getRegion(); 1235 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1236 result.completeExceptionally( 1237 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1238 return; 1239 } 1240 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1241 if (!tableName.get().equals(regionInfo.getTable())) { 1242 // tables of this two region should be same. 1243 result.completeExceptionally( 1244 new IllegalArgumentException("Cannot merge regions from two different tables " 1245 + tableName.get() + " and " + regionInfo.getTable())); 1246 } else { 1247 result.complete(tableName.get()); 1248 } 1249 } 1250 }); 1251 } 1252 1253 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) { 1254 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1255 CompletableFuture<TableName> future = new CompletableFuture<>(); 1256 for (byte[] encodedRegionName : encodedRegionNames) { 1257 checkAndGetTableName(encodedRegionName, tableNameRef, future); 1258 } 1259 return future; 1260 } 1261 1262 @Override 1263 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) { 1264 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE); 1265 } 1266 1267 @Override 1268 public CompletableFuture<Boolean> isMergeEnabled() { 1269 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1270 } 1271 1272 @Override 1273 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) { 1274 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT); 1275 } 1276 1277 @Override 1278 public CompletableFuture<Boolean> isSplitEnabled() { 1279 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1280 } 1281 1282 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous, 1283 MasterSwitchType switchType) { 1284 SetSplitOrMergeEnabledRequest request = 1285 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType); 1286 return this.<Boolean> newMasterCaller() 1287 .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest, 1288 SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1289 (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done), 1290 (resp) -> resp.getPrevValueList().get(0))) 1291 .call(); 1292 } 1293 1294 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1295 IsSplitOrMergeEnabledRequest request = 1296 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1297 return this.<Boolean> newMasterCaller() 1298 .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest, 1299 IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request, 1300 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled())) 1301 .call(); 1302 } 1303 1304 @Override 1305 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) { 1306 if (nameOfRegionsToMerge.size() < 2) { 1307 return failedFuture(new IllegalArgumentException( 1308 "Can not merge only " + nameOfRegionsToMerge.size() + " region")); 1309 } 1310 CompletableFuture<Void> future = new CompletableFuture<>(); 1311 byte[][] encodedNameOfRegionsToMerge = 1312 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new); 1313 1314 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> { 1315 if (err != null) { 1316 future.completeExceptionally(err); 1317 return; 1318 } 1319 1320 final MergeTableRegionsRequest request; 1321 try { 1322 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge, 1323 forcible, ng.getNonceGroup(), ng.newNonce()); 1324 } catch (DeserializationException e) { 1325 future.completeExceptionally(e); 1326 return; 1327 } 1328 1329 addListener( 1330 this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions, 1331 MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)), 1332 (ret, err2) -> { 1333 if (err2 != null) { 1334 future.completeExceptionally(err2); 1335 } else { 1336 future.complete(ret); 1337 } 1338 }); 1339 }); 1340 return future; 1341 } 1342 1343 @Override 1344 public CompletableFuture<Void> split(TableName tableName) { 1345 CompletableFuture<Void> future = new CompletableFuture<>(); 1346 addListener(tableExists(tableName), (exist, error) -> { 1347 if (error != null) { 1348 future.completeExceptionally(error); 1349 return; 1350 } 1351 if (!exist) { 1352 future.completeExceptionally(new TableNotFoundException(tableName)); 1353 return; 1354 } 1355 addListener( 1356 metaTable 1357 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1358 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) 1359 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), 1360 (results, err2) -> { 1361 if (err2 != null) { 1362 future.completeExceptionally(err2); 1363 return; 1364 } 1365 if (results != null && !results.isEmpty()) { 1366 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1367 for (Result r : results) { 1368 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { 1369 continue; 1370 } 1371 RegionLocations rl = MetaTableAccessor.getRegionLocations(r); 1372 if (rl != null) { 1373 for (HRegionLocation h : rl.getRegionLocations()) { 1374 if (h != null && h.getServerName() != null) { 1375 RegionInfo hri = h.getRegion(); 1376 if ( 1377 hri == null || hri.isSplitParent() 1378 || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID 1379 ) { 1380 continue; 1381 } 1382 splitFutures.add(split(hri, null)); 1383 } 1384 } 1385 } 1386 } 1387 addListener( 1388 CompletableFuture 1389 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1390 (ret, exception) -> { 1391 if (exception != null) { 1392 future.completeExceptionally(exception); 1393 return; 1394 } 1395 future.complete(ret); 1396 }); 1397 } else { 1398 future.complete(null); 1399 } 1400 }); 1401 }); 1402 return future; 1403 } 1404 1405 @Override 1406 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1407 CompletableFuture<Void> result = new CompletableFuture<>(); 1408 if (splitPoint == null) { 1409 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1410 } 1411 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1412 (loc, err) -> { 1413 if (err != null) { 1414 result.completeExceptionally(err); 1415 } else if (loc == null || loc.getRegion() == null) { 1416 result.completeExceptionally(new IllegalArgumentException( 1417 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1418 } else { 1419 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1420 if (err2 != null) { 1421 result.completeExceptionally(err2); 1422 } else { 1423 result.complete(ret); 1424 } 1425 1426 }); 1427 } 1428 }); 1429 return result; 1430 } 1431 1432 @Override 1433 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1434 CompletableFuture<Void> future = new CompletableFuture<>(); 1435 addListener(getRegionLocation(regionName), (location, err) -> { 1436 if (err != null) { 1437 future.completeExceptionally(err); 1438 return; 1439 } 1440 RegionInfo regionInfo = location.getRegion(); 1441 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1442 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1443 + "Replicas are auto-split when their primary is split.")); 1444 return; 1445 } 1446 ServerName serverName = location.getServerName(); 1447 if (serverName == null) { 1448 future 1449 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1450 return; 1451 } 1452 addListener(split(regionInfo, null), (ret, err2) -> { 1453 if (err2 != null) { 1454 future.completeExceptionally(err2); 1455 } else { 1456 future.complete(ret); 1457 } 1458 }); 1459 }); 1460 return future; 1461 } 1462 1463 @Override 1464 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1465 Preconditions.checkNotNull(splitPoint, 1466 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1467 CompletableFuture<Void> future = new CompletableFuture<>(); 1468 addListener(getRegionLocation(regionName), (location, err) -> { 1469 if (err != null) { 1470 future.completeExceptionally(err); 1471 return; 1472 } 1473 RegionInfo regionInfo = location.getRegion(); 1474 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1475 future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " 1476 + "Replicas are auto-split when their primary is split.")); 1477 return; 1478 } 1479 ServerName serverName = location.getServerName(); 1480 if (serverName == null) { 1481 future 1482 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1483 return; 1484 } 1485 if ( 1486 regionInfo.getStartKey() != null 1487 && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0 1488 ) { 1489 future.completeExceptionally( 1490 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1491 return; 1492 } 1493 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1494 if (err2 != null) { 1495 future.completeExceptionally(err2); 1496 } else { 1497 future.complete(ret); 1498 } 1499 }); 1500 }); 1501 return future; 1502 } 1503 1504 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1505 CompletableFuture<Void> future = new CompletableFuture<>(); 1506 TableName tableName = hri.getTable(); 1507 final SplitTableRegionRequest request; 1508 try { 1509 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1510 ng.newNonce()); 1511 } catch (DeserializationException e) { 1512 future.completeExceptionally(e); 1513 return future; 1514 } 1515 1516 addListener( 1517 this.procedureCall(tableName, request, MasterService.Interface::splitRegion, 1518 SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)), 1519 (ret, err2) -> { 1520 if (err2 != null) { 1521 future.completeExceptionally(err2); 1522 } else { 1523 future.complete(ret); 1524 } 1525 }); 1526 return future; 1527 } 1528 1529 @Override 1530 public CompletableFuture<Void> assign(byte[] regionName) { 1531 CompletableFuture<Void> future = new CompletableFuture<>(); 1532 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1533 if (err != null) { 1534 future.completeExceptionally(err); 1535 return; 1536 } 1537 addListener( 1538 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1539 .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1540 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1541 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) 1542 .call(), 1543 (ret, err2) -> { 1544 if (err2 != null) { 1545 future.completeExceptionally(err2); 1546 } else { 1547 future.complete(ret); 1548 } 1549 }); 1550 }); 1551 return future; 1552 } 1553 1554 @Override 1555 public CompletableFuture<Void> unassign(byte[] regionName) { 1556 CompletableFuture<Void> future = new CompletableFuture<>(); 1557 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1558 if (err != null) { 1559 future.completeExceptionally(err); 1560 return; 1561 } 1562 addListener( 1563 this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1564 .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse, 1565 Void> call(controller, stub, 1566 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), 1567 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)) 1568 .call(), 1569 (ret, err2) -> { 1570 if (err2 != null) { 1571 future.completeExceptionally(err2); 1572 } else { 1573 future.complete(ret); 1574 } 1575 }); 1576 }); 1577 return future; 1578 } 1579 1580 @Override 1581 public CompletableFuture<Void> offline(byte[] regionName) { 1582 CompletableFuture<Void> future = new CompletableFuture<>(); 1583 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1584 if (err != null) { 1585 future.completeExceptionally(err); 1586 return; 1587 } 1588 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1589 .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( 1590 controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1591 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) 1592 .call(), (ret, err2) -> { 1593 if (err2 != null) { 1594 future.completeExceptionally(err2); 1595 } else { 1596 future.complete(ret); 1597 } 1598 }); 1599 }); 1600 return future; 1601 } 1602 1603 @Override 1604 public CompletableFuture<Void> move(byte[] regionName) { 1605 CompletableFuture<Void> future = new CompletableFuture<>(); 1606 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1607 if (err != null) { 1608 future.completeExceptionally(err); 1609 return; 1610 } 1611 addListener( 1612 moveRegion(regionInfo, 1613 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1614 (ret, err2) -> { 1615 if (err2 != null) { 1616 future.completeExceptionally(err2); 1617 } else { 1618 future.complete(ret); 1619 } 1620 }); 1621 }); 1622 return future; 1623 } 1624 1625 @Override 1626 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1627 Preconditions.checkNotNull(destServerName, 1628 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1629 CompletableFuture<Void> future = new CompletableFuture<>(); 1630 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1631 if (err != null) { 1632 future.completeExceptionally(err); 1633 return; 1634 } 1635 addListener( 1636 moveRegion(regionInfo, RequestConverter 1637 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1638 (ret, err2) -> { 1639 if (err2 != null) { 1640 future.completeExceptionally(err2); 1641 } else { 1642 future.complete(ret); 1643 } 1644 }); 1645 }); 1646 return future; 1647 } 1648 1649 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { 1650 return this.<Void> newMasterCaller().priority(regionInfo.getTable()) 1651 .action( 1652 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1653 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) 1654 .call(); 1655 } 1656 1657 @Override 1658 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1659 return this.<Void> newMasterCaller() 1660 .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1661 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1662 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)) 1663 .call(); 1664 } 1665 1666 @Override 1667 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1668 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1669 Scan scan = QuotaTableUtil.makeScan(filter); 1670 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan, 1671 new AdvancedScanResultConsumer() { 1672 List<QuotaSettings> settings = new ArrayList<>(); 1673 1674 @Override 1675 public void onNext(Result[] results, ScanController controller) { 1676 for (Result result : results) { 1677 try { 1678 QuotaTableUtil.parseResultToCollection(result, settings); 1679 } catch (IOException e) { 1680 controller.terminate(); 1681 future.completeExceptionally(e); 1682 } 1683 } 1684 } 1685 1686 @Override 1687 public void onError(Throwable error) { 1688 future.completeExceptionally(error); 1689 } 1690 1691 @Override 1692 public void onComplete() { 1693 future.complete(settings); 1694 } 1695 }); 1696 return future; 1697 } 1698 1699 @Override 1700 public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, 1701 boolean enabled) { 1702 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1703 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1704 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1705 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1706 } 1707 1708 @Override 1709 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1710 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1711 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1712 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1713 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1714 } 1715 1716 @Override 1717 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1718 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1719 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1720 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1721 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1722 } 1723 1724 @Override 1725 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1726 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1727 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1728 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1729 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1730 } 1731 1732 @Override 1733 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1734 return this.<ReplicationPeerConfig> newMasterCaller() 1735 .action((controller, stub) -> this.<GetReplicationPeerConfigRequest, 1736 GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub, 1737 RequestConverter.buildGetReplicationPeerConfigRequest(peerId), 1738 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1739 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))) 1740 .call(); 1741 } 1742 1743 @Override 1744 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1745 ReplicationPeerConfig peerConfig) { 1746 return this.<UpdateReplicationPeerConfigRequest, 1747 UpdateReplicationPeerConfigResponse> procedureCall( 1748 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1749 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1750 (resp) -> resp.getProcId(), 1751 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1752 } 1753 1754 @Override 1755 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1756 Map<TableName, List<String>> tableCfs) { 1757 if (tableCfs == null) { 1758 return failedFuture(new ReplicationException("tableCfs is null")); 1759 } 1760 1761 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1762 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1763 if (!completeExceptionally(future, error)) { 1764 ReplicationPeerConfig newPeerConfig = 1765 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1766 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1767 if (!completeExceptionally(future, error)) { 1768 future.complete(result); 1769 } 1770 }); 1771 } 1772 }); 1773 return future; 1774 } 1775 1776 @Override 1777 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1778 Map<TableName, List<String>> tableCfs) { 1779 if (tableCfs == null) { 1780 return failedFuture(new ReplicationException("tableCfs is null")); 1781 } 1782 1783 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1784 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1785 if (!completeExceptionally(future, error)) { 1786 ReplicationPeerConfig newPeerConfig = null; 1787 try { 1788 newPeerConfig = ReplicationPeerConfigUtil 1789 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1790 } catch (ReplicationException e) { 1791 future.completeExceptionally(e); 1792 return; 1793 } 1794 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1795 if (!completeExceptionally(future, error)) { 1796 future.complete(result); 1797 } 1798 }); 1799 } 1800 }); 1801 return future; 1802 } 1803 1804 @Override 1805 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1806 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1807 } 1808 1809 @Override 1810 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1811 Preconditions.checkNotNull(pattern, 1812 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1813 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1814 } 1815 1816 private CompletableFuture<List<ReplicationPeerDescription>> 1817 listReplicationPeers(ListReplicationPeersRequest request) { 1818 return this.<List<ReplicationPeerDescription>> newMasterCaller() 1819 .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse, 1820 List<ReplicationPeerDescription>> call(controller, stub, request, 1821 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1822 (resp) -> resp.getPeerDescList().stream() 1823 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1824 .collect(Collectors.toList()))) 1825 .call(); 1826 } 1827 1828 @Override 1829 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1830 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1831 addListener(listTableDescriptors(), (tables, error) -> { 1832 if (!completeExceptionally(future, error)) { 1833 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1834 tables.forEach(table -> { 1835 Map<String, Integer> cfs = new HashMap<>(); 1836 Stream.of(table.getColumnFamilies()) 1837 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1838 .forEach(column -> { 1839 cfs.put(column.getNameAsString(), column.getScope()); 1840 }); 1841 if (!cfs.isEmpty()) { 1842 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1843 } 1844 }); 1845 future.complete(replicatedTableCFs); 1846 } 1847 }); 1848 return future; 1849 } 1850 1851 @Override 1852 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1853 SnapshotProtos.SnapshotDescription snapshot = 1854 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1855 try { 1856 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1857 } catch (IllegalArgumentException e) { 1858 return failedFuture(e); 1859 } 1860 CompletableFuture<Void> future = new CompletableFuture<>(); 1861 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); 1862 addListener(this.<Long> newMasterCaller() 1863 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, 1864 stub, request, (s, c, req, done) -> s.snapshot(c, req, done), 1865 resp -> resp.getExpectedTimeout())) 1866 .call(), (expectedTimeout, err) -> { 1867 if (err != null) { 1868 future.completeExceptionally(err); 1869 return; 1870 } 1871 TimerTask pollingTask = new TimerTask() { 1872 int tries = 0; 1873 long startTime = EnvironmentEdgeManager.currentTime(); 1874 long endTime = startTime + expectedTimeout; 1875 long maxPauseTime = expectedTimeout / maxAttempts; 1876 1877 @Override 1878 public void run(Timeout timeout) throws Exception { 1879 if (EnvironmentEdgeManager.currentTime() < endTime) { 1880 addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { 1881 if (err2 != null) { 1882 future.completeExceptionally(err2); 1883 } else if (done) { 1884 future.complete(null); 1885 } else { 1886 // retry again after pauseTime. 1887 long pauseTime = 1888 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1889 pauseTime = Math.min(pauseTime, maxPauseTime); 1890 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 1891 TimeUnit.MILLISECONDS); 1892 } 1893 }); 1894 } else { 1895 future.completeExceptionally( 1896 new SnapshotCreationException("Snapshot '" + snapshot.getName() 1897 + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); 1898 } 1899 } 1900 }; 1901 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1902 }); 1903 return future; 1904 } 1905 1906 @Override 1907 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1908 return this.<Boolean> newMasterCaller() 1909 .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, 1910 Boolean> call(controller, stub, 1911 IsSnapshotDoneRequest.newBuilder() 1912 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 1913 (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) 1914 .call(); 1915 } 1916 1917 @Override 1918 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1919 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1920 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1921 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1922 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1923 } 1924 1925 @Override 1926 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot, 1927 boolean restoreAcl) { 1928 CompletableFuture<Void> future = new CompletableFuture<>(); 1929 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 1930 if (err != null) { 1931 future.completeExceptionally(err); 1932 return; 1933 } 1934 TableName tableName = null; 1935 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 1936 for (SnapshotDescription snap : snapshotDescriptions) { 1937 if (snap.getName().equals(snapshotName)) { 1938 tableName = snap.getTableName(); 1939 break; 1940 } 1941 } 1942 } 1943 if (tableName == null) { 1944 future.completeExceptionally(new RestoreSnapshotException( 1945 "Unable to find the table name for snapshot=" + snapshotName)); 1946 return; 1947 } 1948 final TableName finalTableName = tableName; 1949 addListener(tableExists(finalTableName), (exists, err2) -> { 1950 if (err2 != null) { 1951 future.completeExceptionally(err2); 1952 } else if (!exists) { 1953 // if table does not exist, then just clone snapshot into new table. 1954 completeConditionalOnFuture(future, 1955 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null)); 1956 } else { 1957 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 1958 if (err4 != null) { 1959 future.completeExceptionally(err4); 1960 } else if (!disabled) { 1961 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 1962 } else { 1963 completeConditionalOnFuture(future, 1964 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl)); 1965 } 1966 }); 1967 } 1968 }); 1969 }); 1970 return future; 1971 } 1972 1973 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 1974 boolean takeFailSafeSnapshot, boolean restoreAcl) { 1975 if (takeFailSafeSnapshot) { 1976 CompletableFuture<Void> future = new CompletableFuture<>(); 1977 // Step.1 Take a snapshot of the current state 1978 String failSafeSnapshotSnapshotNameFormat = 1979 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 1980 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 1981 final String failSafeSnapshotSnapshotName = 1982 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 1983 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 1984 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 1985 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1986 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 1987 if (err != null) { 1988 future.completeExceptionally(err); 1989 } else { 1990 // Step.2 Restore snapshot 1991 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null), 1992 (void2, err2) -> { 1993 if (err2 != null) { 1994 // Step.3.a Something went wrong during the restore and try to rollback. 1995 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, 1996 restoreAcl, null), (void3, err3) -> { 1997 if (err3 != null) { 1998 future.completeExceptionally(err3); 1999 } else { 2000 String msg = 2001 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" 2002 + failSafeSnapshotSnapshotName + " succeeded."; 2003 future.completeExceptionally(new RestoreSnapshotException(msg, err2)); 2004 } 2005 }); 2006 } else { 2007 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 2008 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 2009 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 2010 if (err3 != null) { 2011 LOG.error( 2012 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 2013 err3); 2014 future.completeExceptionally(err3); 2015 } else { 2016 future.complete(ret3); 2017 } 2018 }); 2019 } 2020 }); 2021 } 2022 }); 2023 return future; 2024 } else { 2025 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null); 2026 } 2027 } 2028 2029 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 2030 CompletableFuture<T> parentFuture) { 2031 addListener(parentFuture, (res, err) -> { 2032 if (err != null) { 2033 dependentFuture.completeExceptionally(err); 2034 } else { 2035 dependentFuture.complete(res); 2036 } 2037 }); 2038 } 2039 2040 @Override 2041 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName, 2042 boolean restoreAcl, String customSFT) { 2043 CompletableFuture<Void> future = new CompletableFuture<>(); 2044 addListener(tableExists(tableName), (exists, err) -> { 2045 if (err != null) { 2046 future.completeExceptionally(err); 2047 } else if (exists) { 2048 future.completeExceptionally(new TableExistsException(tableName)); 2049 } else { 2050 completeConditionalOnFuture(future, 2051 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT)); 2052 } 2053 }); 2054 return future; 2055 } 2056 2057 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName, 2058 boolean restoreAcl, String customSFT) { 2059 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 2060 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 2061 try { 2062 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 2063 } catch (IllegalArgumentException e) { 2064 return failedFuture(e); 2065 } 2066 RestoreSnapshotRequest.Builder builder = 2067 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) 2068 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl); 2069 if (customSFT != null) { 2070 builder.setCustomSFT(customSFT); 2071 } 2072 return waitProcedureResult(this.<Long> newMasterCaller() 2073 .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, 2074 Long> call(controller, stub, builder.build(), 2075 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) 2076 .call()); 2077 } 2078 2079 @Override 2080 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 2081 return getCompletedSnapshots(null); 2082 } 2083 2084 @Override 2085 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 2086 Preconditions.checkNotNull(pattern, 2087 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 2088 return getCompletedSnapshots(pattern); 2089 } 2090 2091 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 2092 return this.<List<SnapshotDescription>> newMasterCaller() 2093 .action((controller, stub) -> this.<GetCompletedSnapshotsRequest, 2094 GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub, 2095 GetCompletedSnapshotsRequest.newBuilder().build(), 2096 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 2097 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 2098 .call(); 2099 } 2100 2101 @Override 2102 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 2103 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2104 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 2105 return getCompletedSnapshots(tableNamePattern, null); 2106 } 2107 2108 @Override 2109 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 2110 Pattern snapshotNamePattern) { 2111 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2112 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 2113 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2114 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 2115 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2116 } 2117 2118 private CompletableFuture<List<SnapshotDescription>> 2119 getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { 2120 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2121 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2122 if (err != null) { 2123 future.completeExceptionally(err); 2124 return; 2125 } 2126 if (tableNames == null || tableNames.size() <= 0) { 2127 future.complete(Collections.emptyList()); 2128 return; 2129 } 2130 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2131 if (err2 != null) { 2132 future.completeExceptionally(err2); 2133 return; 2134 } 2135 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2136 future.complete(Collections.emptyList()); 2137 return; 2138 } 2139 future.complete(snapshotDescList.stream() 2140 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2141 .collect(Collectors.toList())); 2142 }); 2143 }); 2144 return future; 2145 } 2146 2147 @Override 2148 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2149 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2150 } 2151 2152 @Override 2153 public CompletableFuture<Void> deleteSnapshots() { 2154 return internalDeleteSnapshots(null, null); 2155 } 2156 2157 @Override 2158 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2159 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2160 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2161 return internalDeleteSnapshots(null, snapshotNamePattern); 2162 } 2163 2164 @Override 2165 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2166 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2167 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2168 return internalDeleteSnapshots(tableNamePattern, null); 2169 } 2170 2171 @Override 2172 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2173 Pattern snapshotNamePattern) { 2174 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2175 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2176 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2177 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2178 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2179 } 2180 2181 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2182 Pattern snapshotNamePattern) { 2183 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2184 if (tableNamePattern == null) { 2185 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2186 } else { 2187 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2188 } 2189 CompletableFuture<Void> future = new CompletableFuture<>(); 2190 addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> { 2191 if (err != null) { 2192 future.completeExceptionally(err); 2193 return; 2194 } 2195 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2196 future.complete(null); 2197 return; 2198 } 2199 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2200 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2201 if (e != null) { 2202 future.completeExceptionally(e); 2203 } else { 2204 future.complete(v); 2205 } 2206 }); 2207 }); 2208 return future; 2209 } 2210 2211 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2212 return this.<Void> newMasterCaller() 2213 .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2214 controller, stub, 2215 DeleteSnapshotRequest.newBuilder() 2216 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), 2217 (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) 2218 .call(); 2219 } 2220 2221 @Override 2222 public CompletableFuture<Void> execProcedure(String signature, String instance, 2223 Map<String, String> props) { 2224 CompletableFuture<Void> future = new CompletableFuture<>(); 2225 ProcedureDescription procDesc = 2226 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2227 addListener(this.<Long> newMasterCaller() 2228 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2229 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2230 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2231 .call(), (expectedTimeout, err) -> { 2232 if (err != null) { 2233 future.completeExceptionally(err); 2234 return; 2235 } 2236 TimerTask pollingTask = new TimerTask() { 2237 int tries = 0; 2238 long startTime = EnvironmentEdgeManager.currentTime(); 2239 long endTime = startTime + expectedTimeout; 2240 long maxPauseTime = expectedTimeout / maxAttempts; 2241 2242 @Override 2243 public void run(Timeout timeout) throws Exception { 2244 if (EnvironmentEdgeManager.currentTime() < endTime) { 2245 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2246 if (err2 != null) { 2247 future.completeExceptionally(err2); 2248 return; 2249 } 2250 if (done) { 2251 future.complete(null); 2252 } else { 2253 // retry again after pauseTime. 2254 long pauseTime = 2255 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2256 pauseTime = Math.min(pauseTime, maxPauseTime); 2257 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2258 TimeUnit.MICROSECONDS); 2259 } 2260 }); 2261 } else { 2262 future.completeExceptionally(new IOException("Procedure '" + signature + " : " 2263 + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2264 } 2265 } 2266 }; 2267 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2268 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2269 }); 2270 return future; 2271 } 2272 2273 @Override 2274 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2275 Map<String, String> props) { 2276 ProcedureDescription proDesc = 2277 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2278 return this.<byte[]> newMasterCaller() 2279 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2280 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2281 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2282 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2283 .call(); 2284 } 2285 2286 @Override 2287 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2288 Map<String, String> props) { 2289 ProcedureDescription proDesc = 2290 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2291 return this.<Boolean> newMasterCaller() 2292 .action( 2293 (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call( 2294 controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2295 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2296 .call(); 2297 } 2298 2299 @Override 2300 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2301 return this.<Boolean> newMasterCaller().action( 2302 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2303 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2304 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2305 .call(); 2306 } 2307 2308 @Override 2309 public CompletableFuture<String> getProcedures() { 2310 return this.<String> newMasterCaller() 2311 .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call( 2312 controller, stub, GetProceduresRequest.newBuilder().build(), 2313 (s, c, req, done) -> s.getProcedures(c, req, done), 2314 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))) 2315 .call(); 2316 } 2317 2318 @Override 2319 public CompletableFuture<String> getLocks() { 2320 return this.<String> newMasterCaller() 2321 .action( 2322 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller, 2323 stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done), 2324 resp -> ProtobufUtil.toLockJson(resp.getLockList()))) 2325 .call(); 2326 } 2327 2328 @Override 2329 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, 2330 boolean offload) { 2331 return this.<Void> newMasterCaller() 2332 .action((controller, stub) -> this.<DecommissionRegionServersRequest, 2333 DecommissionRegionServersResponse, Void> call(controller, stub, 2334 RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2335 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2336 .call(); 2337 } 2338 2339 @Override 2340 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2341 return this.<List<ServerName>> newMasterCaller() 2342 .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest, 2343 ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub, 2344 ListDecommissionedRegionServersRequest.newBuilder().build(), 2345 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2346 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2347 .collect(Collectors.toList()))) 2348 .call(); 2349 } 2350 2351 @Override 2352 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2353 List<byte[]> encodedRegionNames) { 2354 return this.<Void> newMasterCaller() 2355 .action((controller, stub) -> this.<RecommissionRegionServerRequest, 2356 RecommissionRegionServerResponse, Void> call(controller, stub, 2357 RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2358 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2359 .call(); 2360 } 2361 2362 /** 2363 * Get the region location for the passed region name. The region name may be a full region name 2364 * or encoded region name. If the region does not found, then it'll throw an 2365 * UnknownRegionException wrapped by a {@link CompletableFuture} 2366 * @param regionNameOrEncodedRegionName region name or encoded region name 2367 * @return region location, wrapped by a {@link CompletableFuture} 2368 */ 2369 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2370 if (regionNameOrEncodedRegionName == null) { 2371 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2372 } 2373 2374 CompletableFuture<Optional<HRegionLocation>> future; 2375 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2376 String encodedName = Bytes.toString(regionNameOrEncodedRegionName); 2377 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) { 2378 // old format encodedName, should be meta region 2379 future = connection.registry.getMetaRegionLocations() 2380 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2381 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst()); 2382 } else { 2383 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2384 regionNameOrEncodedRegionName); 2385 } 2386 } else { 2387 // Not all regionNameOrEncodedRegionName here is going to be a valid region name, 2388 // it needs to throw out IllegalArgumentException in case tableName is passed in. 2389 RegionInfo regionInfo; 2390 try { 2391 regionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName); 2392 } catch (IOException ioe) { 2393 return failedFuture(new IllegalArgumentException(ioe.getMessage())); 2394 } 2395 2396 if (regionInfo.isMetaRegion()) { 2397 future = connection.registry.getMetaRegionLocations() 2398 .thenApply(locs -> Stream.of(locs.getRegionLocations()) 2399 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId()) 2400 .findFirst()); 2401 } else { 2402 future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2403 } 2404 } 2405 2406 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2407 addListener(future, (location, err) -> { 2408 if (err != null) { 2409 returnedFuture.completeExceptionally(err); 2410 return; 2411 } 2412 if (!location.isPresent() || location.get().getRegion() == null) { 2413 returnedFuture.completeExceptionally( 2414 new UnknownRegionException("Invalid region name or encoded region name: " 2415 + Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2416 } else { 2417 returnedFuture.complete(location.get()); 2418 } 2419 }); 2420 return returnedFuture; 2421 } 2422 2423 /** 2424 * Get the region info for the passed region name. The region name may be a full region name or 2425 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2426 * wrapped by a {@link CompletableFuture} 2427 * @return region info, wrapped by a {@link CompletableFuture} 2428 */ 2429 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2430 if (regionNameOrEncodedRegionName == null) { 2431 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2432 } 2433 2434 if ( 2435 Bytes.equals(regionNameOrEncodedRegionName, 2436 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) 2437 || Bytes.equals(regionNameOrEncodedRegionName, 2438 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes()) 2439 ) { 2440 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2441 } 2442 2443 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2444 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2445 if (err != null) { 2446 future.completeExceptionally(err); 2447 } else { 2448 future.complete(location.getRegion()); 2449 } 2450 }); 2451 return future; 2452 } 2453 2454 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2455 if (numRegions < 3) { 2456 throw new IllegalArgumentException("Must create at least three regions"); 2457 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2458 throw new IllegalArgumentException("Start key must be smaller than end key"); 2459 } 2460 if (numRegions == 3) { 2461 return new byte[][] { startKey, endKey }; 2462 } 2463 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2464 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2465 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2466 } 2467 return splitKeys; 2468 } 2469 2470 private void verifySplitKeys(byte[][] splitKeys) { 2471 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2472 // Verify there are no duplicate split keys 2473 byte[] lastKey = null; 2474 for (byte[] splitKey : splitKeys) { 2475 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2476 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2477 } 2478 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2479 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2480 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2481 } 2482 lastKey = splitKey; 2483 } 2484 } 2485 2486 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2487 2488 abstract void onFinished(); 2489 2490 abstract void onError(Throwable error); 2491 2492 @Override 2493 public void accept(Void v, Throwable error) { 2494 if (error != null) { 2495 onError(error); 2496 return; 2497 } 2498 onFinished(); 2499 } 2500 } 2501 2502 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2503 protected final TableName tableName; 2504 2505 TableProcedureBiConsumer(TableName tableName) { 2506 this.tableName = tableName; 2507 } 2508 2509 abstract String getOperationType(); 2510 2511 String getDescription() { 2512 return "Operation: " + getOperationType() + ", " + "Table Name: " 2513 + tableName.getNameWithNamespaceInclAsString(); 2514 } 2515 2516 @Override 2517 void onFinished() { 2518 LOG.info(getDescription() + " completed"); 2519 } 2520 2521 @Override 2522 void onError(Throwable error) { 2523 LOG.info(getDescription() + " failed with " + error.getMessage()); 2524 } 2525 } 2526 2527 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2528 protected final String namespaceName; 2529 2530 NamespaceProcedureBiConsumer(String namespaceName) { 2531 this.namespaceName = namespaceName; 2532 } 2533 2534 abstract String getOperationType(); 2535 2536 String getDescription() { 2537 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2538 } 2539 2540 @Override 2541 void onFinished() { 2542 LOG.info(getDescription() + " completed"); 2543 } 2544 2545 @Override 2546 void onError(Throwable error) { 2547 LOG.info(getDescription() + " failed with " + error.getMessage()); 2548 } 2549 } 2550 2551 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2552 2553 CreateTableProcedureBiConsumer(TableName tableName) { 2554 super(tableName); 2555 } 2556 2557 @Override 2558 String getOperationType() { 2559 return "CREATE"; 2560 } 2561 } 2562 2563 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2564 2565 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2566 super(tableName); 2567 } 2568 2569 @Override 2570 String getOperationType() { 2571 return "MODIFY"; 2572 } 2573 } 2574 2575 private static class ModifyTableStoreFileTrackerProcedureBiConsumer 2576 extends TableProcedureBiConsumer { 2577 2578 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2579 super(tableName); 2580 } 2581 2582 @Override 2583 String getOperationType() { 2584 return "MODIFY_TABLE_STORE_FILE_TRACKER"; 2585 } 2586 } 2587 2588 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2589 2590 DeleteTableProcedureBiConsumer(TableName tableName) { 2591 super(tableName); 2592 } 2593 2594 @Override 2595 String getOperationType() { 2596 return "DELETE"; 2597 } 2598 2599 @Override 2600 void onFinished() { 2601 connection.getLocator().clearCache(this.tableName); 2602 super.onFinished(); 2603 } 2604 } 2605 2606 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2607 2608 TruncateTableProcedureBiConsumer(TableName tableName) { 2609 super(tableName); 2610 } 2611 2612 @Override 2613 String getOperationType() { 2614 return "TRUNCATE"; 2615 } 2616 } 2617 2618 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2619 2620 EnableTableProcedureBiConsumer(TableName tableName) { 2621 super(tableName); 2622 } 2623 2624 @Override 2625 String getOperationType() { 2626 return "ENABLE"; 2627 } 2628 } 2629 2630 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2631 2632 DisableTableProcedureBiConsumer(TableName tableName) { 2633 super(tableName); 2634 } 2635 2636 @Override 2637 String getOperationType() { 2638 return "DISABLE"; 2639 } 2640 } 2641 2642 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2643 2644 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2645 super(tableName); 2646 } 2647 2648 @Override 2649 String getOperationType() { 2650 return "ADD_COLUMN_FAMILY"; 2651 } 2652 } 2653 2654 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2655 2656 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2657 super(tableName); 2658 } 2659 2660 @Override 2661 String getOperationType() { 2662 return "DELETE_COLUMN_FAMILY"; 2663 } 2664 } 2665 2666 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2667 2668 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2669 super(tableName); 2670 } 2671 2672 @Override 2673 String getOperationType() { 2674 return "MODIFY_COLUMN_FAMILY"; 2675 } 2676 } 2677 2678 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer 2679 extends TableProcedureBiConsumer { 2680 2681 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) { 2682 super(tableName); 2683 } 2684 2685 @Override 2686 String getOperationType() { 2687 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER"; 2688 } 2689 } 2690 2691 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2692 2693 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2694 super(namespaceName); 2695 } 2696 2697 @Override 2698 String getOperationType() { 2699 return "CREATE_NAMESPACE"; 2700 } 2701 } 2702 2703 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2704 2705 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2706 super(namespaceName); 2707 } 2708 2709 @Override 2710 String getOperationType() { 2711 return "DELETE_NAMESPACE"; 2712 } 2713 } 2714 2715 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2716 2717 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2718 super(namespaceName); 2719 } 2720 2721 @Override 2722 String getOperationType() { 2723 return "MODIFY_NAMESPACE"; 2724 } 2725 } 2726 2727 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2728 2729 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2730 super(tableName); 2731 } 2732 2733 @Override 2734 String getOperationType() { 2735 return "MERGE_REGIONS"; 2736 } 2737 } 2738 2739 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2740 2741 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2742 super(tableName); 2743 } 2744 2745 @Override 2746 String getOperationType() { 2747 return "SPLIT_REGION"; 2748 } 2749 } 2750 2751 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2752 private final String peerId; 2753 private final Supplier<String> getOperation; 2754 2755 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2756 this.peerId = peerId; 2757 this.getOperation = getOperation; 2758 } 2759 2760 String getDescription() { 2761 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2762 } 2763 2764 @Override 2765 void onFinished() { 2766 LOG.info(getDescription() + " completed"); 2767 } 2768 2769 @Override 2770 void onError(Throwable error) { 2771 LOG.info(getDescription() + " failed with " + error.getMessage()); 2772 } 2773 } 2774 2775 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2776 CompletableFuture<Void> future = new CompletableFuture<>(); 2777 addListener(procFuture, (procId, error) -> { 2778 if (error != null) { 2779 future.completeExceptionally(error); 2780 return; 2781 } 2782 getProcedureResult(procId, future, 0); 2783 }); 2784 return future; 2785 } 2786 2787 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2788 addListener( 2789 this.<GetProcedureResultResponse> newMasterCaller() 2790 .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, 2791 GetProcedureResultResponse> call(controller, stub, 2792 GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2793 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2794 .call(), 2795 (response, error) -> { 2796 if (error != null) { 2797 LOG.warn("failed to get the procedure result procId={}", procId, 2798 ConnectionUtils.translateException(error)); 2799 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2800 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2801 return; 2802 } 2803 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2804 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2805 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2806 return; 2807 } 2808 if (response.hasException()) { 2809 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2810 future.completeExceptionally(ioe); 2811 } else { 2812 future.complete(null); 2813 } 2814 }); 2815 } 2816 2817 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2818 CompletableFuture<T> future = new CompletableFuture<>(); 2819 future.completeExceptionally(error); 2820 return future; 2821 } 2822 2823 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2824 if (error != null) { 2825 future.completeExceptionally(error); 2826 return true; 2827 } 2828 return false; 2829 } 2830 2831 @Override 2832 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2833 return getClusterMetrics(EnumSet.allOf(Option.class)); 2834 } 2835 2836 @Override 2837 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2838 return this.<ClusterMetrics> newMasterCaller() 2839 .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse, 2840 ClusterMetrics> call(controller, stub, 2841 RequestConverter.buildGetClusterStatusRequest(options), 2842 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2843 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))) 2844 .call(); 2845 } 2846 2847 @Override 2848 public CompletableFuture<Void> shutdown() { 2849 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2850 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2851 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), 2852 resp -> null)) 2853 .call(); 2854 } 2855 2856 @Override 2857 public CompletableFuture<Void> stopMaster() { 2858 return this.<Void> newMasterCaller().priority(HIGH_QOS) 2859 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call( 2860 controller, stub, StopMasterRequest.newBuilder().build(), 2861 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) 2862 .call(); 2863 } 2864 2865 @Override 2866 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2867 StopServerRequest request = RequestConverter 2868 .buildStopServerRequest("Called by admin client " + this.connection.toString()); 2869 return this.<Void> newAdminCaller().priority(HIGH_QOS) 2870 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2871 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2872 resp -> null)) 2873 .serverName(serverName).call(); 2874 } 2875 2876 @Override 2877 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2878 return this.<Void> newAdminCaller() 2879 .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse, 2880 Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2881 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2882 .serverName(serverName).call(); 2883 } 2884 2885 @Override 2886 public CompletableFuture<Void> updateConfiguration() { 2887 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2888 addListener( 2889 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)), 2890 (status, err) -> { 2891 if (err != null) { 2892 future.completeExceptionally(err); 2893 } else { 2894 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2895 status.getServersName().forEach(server -> futures.add(updateConfiguration(server))); 2896 futures.add(updateConfiguration(status.getMasterName())); 2897 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2898 addListener( 2899 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2900 (result, err2) -> { 2901 if (err2 != null) { 2902 future.completeExceptionally(err2); 2903 } else { 2904 future.complete(result); 2905 } 2906 }); 2907 } 2908 }); 2909 return future; 2910 } 2911 2912 @Override 2913 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 2914 return this.<Void> newAdminCaller() 2915 .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, 2916 Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(), 2917 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 2918 .serverName(serverName).call(); 2919 } 2920 2921 @Override 2922 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 2923 return this.<Void> newAdminCaller() 2924 .action((controller, stub) -> this.<ClearCompactionQueuesRequest, 2925 ClearCompactionQueuesResponse, Void> adminCall(controller, stub, 2926 RequestConverter.buildClearCompactionQueuesRequest(queues), 2927 (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 2928 .serverName(serverName).call(); 2929 } 2930 2931 @Override 2932 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 2933 return this.<List<SecurityCapability>> newMasterCaller() 2934 .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, 2935 List<SecurityCapability>> call(controller, stub, 2936 SecurityCapabilitiesRequest.newBuilder().build(), 2937 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done), 2938 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList()))) 2939 .call(); 2940 } 2941 2942 @Override 2943 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 2944 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 2945 } 2946 2947 @Override 2948 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 2949 TableName tableName) { 2950 Preconditions.checkNotNull(tableName, 2951 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 2952 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 2953 } 2954 2955 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 2956 ServerName serverName) { 2957 return this.<List<RegionMetrics>> newAdminCaller() 2958 .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse, 2959 List<RegionMetrics>> adminCall(controller, stub, request, 2960 (s, c, req, done) -> s.getRegionLoad(controller, req, done), 2961 RegionMetricsBuilder::toRegionMetrics)) 2962 .serverName(serverName).call(); 2963 } 2964 2965 @Override 2966 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 2967 return this.<Boolean> newMasterCaller() 2968 .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, 2969 Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(), 2970 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 2971 resp -> resp.getInMaintenanceMode())) 2972 .call(); 2973 } 2974 2975 @Override 2976 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 2977 CompactType compactType) { 2978 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2979 2980 switch (compactType) { 2981 case MOB: 2982 addListener(connection.registry.getActiveMaster(), (serverName, err) -> { 2983 if (err != null) { 2984 future.completeExceptionally(err); 2985 return; 2986 } 2987 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 2988 2989 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 2990 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 2991 GetRegionInfoResponse> adminCall(controller, stub, 2992 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 2993 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2994 .call(), (resp2, err2) -> { 2995 if (err2 != null) { 2996 future.completeExceptionally(err2); 2997 } else { 2998 if (resp2.hasCompactionState()) { 2999 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3000 } else { 3001 future.complete(CompactionState.NONE); 3002 } 3003 } 3004 }); 3005 }); 3006 break; 3007 case NORMAL: 3008 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3009 if (err != null) { 3010 future.completeExceptionally(err); 3011 return; 3012 } 3013 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 3014 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 3015 locations.stream().filter(loc -> loc.getServerName() != null) 3016 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 3017 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 3018 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 3019 // If any region compaction state is MAJOR_AND_MINOR 3020 // the table compaction state is MAJOR_AND_MINOR, too. 3021 if (err2 != null) { 3022 future.completeExceptionally(unwrapCompletionException(err2)); 3023 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 3024 future.complete(regionState); 3025 } else { 3026 regionStates.add(regionState); 3027 } 3028 })); 3029 }); 3030 addListener( 3031 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3032 (ret, err3) -> { 3033 // If future not completed, check all regions's compaction state 3034 if (!future.isCompletedExceptionally() && !future.isDone()) { 3035 CompactionState state = CompactionState.NONE; 3036 for (CompactionState regionState : regionStates) { 3037 switch (regionState) { 3038 case MAJOR: 3039 if (state == CompactionState.MINOR) { 3040 future.complete(CompactionState.MAJOR_AND_MINOR); 3041 } else { 3042 state = CompactionState.MAJOR; 3043 } 3044 break; 3045 case MINOR: 3046 if (state == CompactionState.MAJOR) { 3047 future.complete(CompactionState.MAJOR_AND_MINOR); 3048 } else { 3049 state = CompactionState.MINOR; 3050 } 3051 break; 3052 case NONE: 3053 default: 3054 } 3055 } 3056 if (!future.isDone()) { 3057 future.complete(state); 3058 } 3059 } 3060 }); 3061 }); 3062 break; 3063 default: 3064 throw new IllegalArgumentException("Unknown compactType: " + compactType); 3065 } 3066 3067 return future; 3068 } 3069 3070 @Override 3071 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 3072 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 3073 addListener(getRegionLocation(regionName), (location, err) -> { 3074 if (err != null) { 3075 future.completeExceptionally(err); 3076 return; 3077 } 3078 ServerName serverName = location.getServerName(); 3079 if (serverName == null) { 3080 future 3081 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 3082 return; 3083 } 3084 addListener( 3085 this.<GetRegionInfoResponse> newAdminCaller() 3086 .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse, 3087 GetRegionInfoResponse> adminCall(controller, stub, 3088 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 3089 true), 3090 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 3091 .serverName(serverName).call(), 3092 (resp2, err2) -> { 3093 if (err2 != null) { 3094 future.completeExceptionally(err2); 3095 } else { 3096 if (resp2.hasCompactionState()) { 3097 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 3098 } else { 3099 future.complete(CompactionState.NONE); 3100 } 3101 } 3102 }); 3103 }); 3104 return future; 3105 } 3106 3107 @Override 3108 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 3109 MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder() 3110 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 3111 return this.<Optional<Long>> newMasterCaller() 3112 .action((controller, stub) -> this.<MajorCompactionTimestampRequest, 3113 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request, 3114 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 3115 ProtobufUtil::toOptionalTimestamp)) 3116 .call(); 3117 } 3118 3119 @Override 3120 public CompletableFuture<Optional<Long>> 3121 getLastMajorCompactionTimestampForRegion(byte[] regionName) { 3122 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 3123 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 3124 addListener(getRegionInfo(regionName), (region, err) -> { 3125 if (err != null) { 3126 future.completeExceptionally(err); 3127 return; 3128 } 3129 MajorCompactionTimestampForRegionRequest.Builder builder = 3130 MajorCompactionTimestampForRegionRequest.newBuilder(); 3131 builder.setRegion( 3132 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 3133 addListener(this.<Optional<Long>> newMasterCaller() 3134 .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest, 3135 MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(), 3136 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 3137 ProtobufUtil::toOptionalTimestamp)) 3138 .call(), (timestamp, err2) -> { 3139 if (err2 != null) { 3140 future.completeExceptionally(err2); 3141 } else { 3142 future.complete(timestamp); 3143 } 3144 }); 3145 }); 3146 return future; 3147 } 3148 3149 @Override 3150 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState, 3151 List<String> serverNamesList) { 3152 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>(); 3153 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> { 3154 if (err != null) { 3155 future.completeExceptionally(err); 3156 return; 3157 } 3158 // Accessed by multiple threads. 3159 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size()); 3160 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size()); 3161 serverNames.stream().forEach(serverName -> { 3162 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { 3163 if (err2 != null) { 3164 future.completeExceptionally(unwrapCompletionException(err2)); 3165 } else { 3166 serverStates.put(serverName, serverState); 3167 } 3168 })); 3169 }); 3170 addListener( 3171 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3172 (ret, err3) -> { 3173 if (!future.isCompletedExceptionally()) { 3174 if (err3 != null) { 3175 future.completeExceptionally(err3); 3176 } else { 3177 future.complete(serverStates); 3178 } 3179 } 3180 }); 3181 }); 3182 return future; 3183 } 3184 3185 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) { 3186 CompletableFuture<List<ServerName>> future = new CompletableFuture<>(); 3187 if (serverNamesList.isEmpty()) { 3188 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture = 3189 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)); 3190 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> { 3191 if (err != null) { 3192 future.completeExceptionally(err); 3193 } else { 3194 future.complete(clusterMetrics.getServersName()); 3195 } 3196 }); 3197 return future; 3198 } else { 3199 List<ServerName> serverList = new ArrayList<>(); 3200 for (String regionServerName : serverNamesList) { 3201 ServerName serverName = null; 3202 try { 3203 serverName = ServerName.valueOf(regionServerName); 3204 } catch (Exception e) { 3205 future.completeExceptionally( 3206 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName))); 3207 } 3208 if (serverName == null) { 3209 future.completeExceptionally( 3210 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName))); 3211 } else { 3212 serverList.add(serverName); 3213 } 3214 } 3215 future.complete(serverList); 3216 } 3217 return future; 3218 } 3219 3220 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) { 3221 return this.<Boolean> newAdminCaller().serverName(serverName) 3222 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse, 3223 Boolean> adminCall(controller, stub, 3224 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), 3225 (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState())) 3226 .call(); 3227 } 3228 3229 @Override 3230 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) { 3231 return this.<Boolean> newMasterCaller() 3232 .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse, 3233 Boolean> call(controller, stub, 3234 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs), 3235 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3236 (resp) -> resp.getPrevBalanceValue())) 3237 .call(); 3238 } 3239 3240 @Override 3241 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) { 3242 return this.<BalanceResponse> newMasterCaller() 3243 .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, 3244 BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request), 3245 (s, c, req, done) -> s.balance(c, req, done), 3246 (resp) -> ProtobufUtil.toBalanceResponse(resp))) 3247 .call(); 3248 } 3249 3250 @Override 3251 public CompletableFuture<Boolean> isBalancerEnabled() { 3252 return this.<Boolean> newMasterCaller() 3253 .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, 3254 Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3255 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3256 .call(); 3257 } 3258 3259 @Override 3260 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3261 return this.<Boolean> newMasterCaller() 3262 .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse, 3263 Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), 3264 (s, c, req, done) -> s.setNormalizerRunning(c, req, done), 3265 (resp) -> resp.getPrevNormalizerValue())) 3266 .call(); 3267 } 3268 3269 @Override 3270 public CompletableFuture<Boolean> isNormalizerEnabled() { 3271 return this.<Boolean> newMasterCaller() 3272 .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, 3273 Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3274 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3275 .call(); 3276 } 3277 3278 @Override 3279 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) { 3280 return normalize(RequestConverter.buildNormalizeRequest(ntfp)); 3281 } 3282 3283 private CompletableFuture<Boolean> normalize(NormalizeRequest request) { 3284 return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub, 3285 request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call(); 3286 } 3287 3288 @Override 3289 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3290 return this.<Boolean> newMasterCaller() 3291 .action((controller, stub) -> this.<SetCleanerChoreRunningRequest, 3292 SetCleanerChoreRunningResponse, Boolean> call(controller, stub, 3293 RequestConverter.buildSetCleanerChoreRunningRequest(enabled), 3294 (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done), 3295 (resp) -> resp.getPrevValue())) 3296 .call(); 3297 } 3298 3299 @Override 3300 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3301 return this.<Boolean> newMasterCaller() 3302 .action( 3303 (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, 3304 Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), 3305 (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3306 .call(); 3307 } 3308 3309 @Override 3310 public CompletableFuture<Boolean> runCleanerChore() { 3311 return this.<Boolean> newMasterCaller() 3312 .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse, 3313 Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(), 3314 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3315 (resp) -> resp.getCleanerChoreRan())) 3316 .call(); 3317 } 3318 3319 @Override 3320 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3321 return this.<Boolean> newMasterCaller() 3322 .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, 3323 Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), 3324 (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue())) 3325 .call(); 3326 } 3327 3328 @Override 3329 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3330 return this.<Boolean> newMasterCaller() 3331 .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest, 3332 IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub, 3333 RequestConverter.buildIsCatalogJanitorEnabledRequest(), 3334 (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue())) 3335 .call(); 3336 } 3337 3338 @Override 3339 public CompletableFuture<Integer> runCatalogJanitor() { 3340 return this.<Integer> newMasterCaller() 3341 .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, 3342 Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(), 3343 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3344 .call(); 3345 } 3346 3347 @Override 3348 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3349 ServiceCaller<S, R> callable) { 3350 MasterCoprocessorRpcChannelImpl channel = 3351 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3352 S stub = stubMaker.apply(channel); 3353 CompletableFuture<R> future = new CompletableFuture<>(); 3354 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3355 callable.call(stub, controller, resp -> { 3356 if (controller.failed()) { 3357 future.completeExceptionally(controller.getFailed()); 3358 } else { 3359 future.complete(resp); 3360 } 3361 }); 3362 return future; 3363 } 3364 3365 @Override 3366 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3367 ServiceCaller<S, R> callable, ServerName serverName) { 3368 RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl( 3369 this.<Message> newServerCaller().serverName(serverName)); 3370 S stub = stubMaker.apply(channel); 3371 CompletableFuture<R> future = new CompletableFuture<>(); 3372 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3373 callable.call(stub, controller, resp -> { 3374 if (controller.failed()) { 3375 future.completeExceptionally(controller.getFailed()); 3376 } else { 3377 future.complete(resp); 3378 } 3379 }); 3380 return future; 3381 } 3382 3383 @Override 3384 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3385 return this.<List<ServerName>> newMasterCaller() 3386 .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse, 3387 List<ServerName>> call(controller, stub, 3388 RequestConverter.buildClearDeadServersRequest(servers), 3389 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3390 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3391 .call(); 3392 } 3393 3394 private <T> ServerRequestCallerBuilder<T> newServerCaller() { 3395 return this.connection.callerFactory.<T> serverRequest() 3396 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3397 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3398 .pause(pauseNs, TimeUnit.NANOSECONDS) 3399 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 3400 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 3401 } 3402 3403 @Override 3404 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3405 if (tableName == null) { 3406 return failedFuture(new IllegalArgumentException("Table name is null")); 3407 } 3408 CompletableFuture<Void> future = new CompletableFuture<>(); 3409 addListener(tableExists(tableName), (exist, err) -> { 3410 if (err != null) { 3411 future.completeExceptionally(err); 3412 return; 3413 } 3414 if (!exist) { 3415 future.completeExceptionally(new TableNotFoundException( 3416 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3417 return; 3418 } 3419 addListener(getTableSplits(tableName), (splits, err1) -> { 3420 if (err1 != null) { 3421 future.completeExceptionally(err1); 3422 } else { 3423 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3424 if (err2 != null) { 3425 future.completeExceptionally(err2); 3426 } else { 3427 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3428 if (err3 != null) { 3429 future.completeExceptionally(err3); 3430 } else { 3431 future.complete(result3); 3432 } 3433 }); 3434 } 3435 }); 3436 } 3437 }); 3438 }); 3439 return future; 3440 } 3441 3442 @Override 3443 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3444 if (tableName == null) { 3445 return failedFuture(new IllegalArgumentException("Table name is null")); 3446 } 3447 CompletableFuture<Void> future = new CompletableFuture<>(); 3448 addListener(tableExists(tableName), (exist, err) -> { 3449 if (err != null) { 3450 future.completeExceptionally(err); 3451 return; 3452 } 3453 if (!exist) { 3454 future.completeExceptionally(new TableNotFoundException( 3455 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3456 return; 3457 } 3458 addListener(setTableReplication(tableName, false), (result, err2) -> { 3459 if (err2 != null) { 3460 future.completeExceptionally(err2); 3461 } else { 3462 future.complete(result); 3463 } 3464 }); 3465 }); 3466 return future; 3467 } 3468 3469 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3470 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3471 addListener( 3472 getRegions(tableName).thenApply(regions -> regions.stream() 3473 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())), 3474 (regions, err2) -> { 3475 if (err2 != null) { 3476 future.completeExceptionally(err2); 3477 return; 3478 } 3479 if (regions.size() == 1) { 3480 future.complete(null); 3481 } else { 3482 byte[][] splits = new byte[regions.size() - 1][]; 3483 for (int i = 1; i < regions.size(); i++) { 3484 splits[i - 1] = regions.get(i).getStartKey(); 3485 } 3486 future.complete(splits); 3487 } 3488 }); 3489 return future; 3490 } 3491 3492 /** 3493 * Connect to peer and check the table descriptor on peer: 3494 * <ol> 3495 * <li>Create the same table on peer when not exist.</li> 3496 * <li>Throw an exception if the table already has replication enabled on any of the column 3497 * families.</li> 3498 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3499 * </ol> 3500 * @param tableName name of the table to sync to the peer 3501 * @param splits table split keys 3502 */ 3503 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3504 byte[][] splits) { 3505 CompletableFuture<Void> future = new CompletableFuture<>(); 3506 addListener(listReplicationPeers(), (peers, err) -> { 3507 if (err != null) { 3508 future.completeExceptionally(err); 3509 return; 3510 } 3511 if (peers == null || peers.size() <= 0) { 3512 future.completeExceptionally( 3513 new IllegalArgumentException("Found no peer cluster for replication.")); 3514 return; 3515 } 3516 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3517 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3518 .forEach(peer -> { 3519 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3520 }); 3521 addListener( 3522 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3523 (result, err2) -> { 3524 if (err2 != null) { 3525 future.completeExceptionally(err2); 3526 } else { 3527 future.complete(result); 3528 } 3529 }); 3530 }); 3531 return future; 3532 } 3533 3534 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3535 ReplicationPeerDescription peer) { 3536 Configuration peerConf = null; 3537 try { 3538 peerConf = 3539 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3540 } catch (IOException e) { 3541 return failedFuture(e); 3542 } 3543 CompletableFuture<Void> future = new CompletableFuture<>(); 3544 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3545 if (err != null) { 3546 future.completeExceptionally(err); 3547 return; 3548 } 3549 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3550 if (err1 != null) { 3551 future.completeExceptionally(err1); 3552 return; 3553 } 3554 AsyncAdmin peerAdmin = conn.getAdmin(); 3555 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3556 if (err2 != null) { 3557 future.completeExceptionally(err2); 3558 return; 3559 } 3560 if (!exist) { 3561 CompletableFuture<Void> createTableFuture = null; 3562 if (splits == null) { 3563 createTableFuture = peerAdmin.createTable(tableDesc); 3564 } else { 3565 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3566 } 3567 addListener(createTableFuture, (result, err3) -> { 3568 if (err3 != null) { 3569 future.completeExceptionally(err3); 3570 } else { 3571 future.complete(result); 3572 } 3573 }); 3574 } else { 3575 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3576 (result, err4) -> { 3577 if (err4 != null) { 3578 future.completeExceptionally(err4); 3579 } else { 3580 future.complete(result); 3581 } 3582 }); 3583 } 3584 }); 3585 }); 3586 }); 3587 return future; 3588 } 3589 3590 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3591 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3592 CompletableFuture<Void> future = new CompletableFuture<>(); 3593 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3594 if (err != null) { 3595 future.completeExceptionally(err); 3596 return; 3597 } 3598 if (peerTableDesc == null) { 3599 future.completeExceptionally( 3600 new IllegalArgumentException("Failed to get table descriptor for table " 3601 + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3602 return; 3603 } 3604 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3605 future.completeExceptionally(new IllegalArgumentException( 3606 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() 3607 + ", but the table descriptors are not same when compared with source cluster." 3608 + " Thus can not enable the table's replication switch.")); 3609 return; 3610 } 3611 future.complete(null); 3612 }); 3613 return future; 3614 } 3615 3616 /** 3617 * Set the table's replication switch if the table's replication switch is already not set. 3618 * @param tableName name of the table 3619 * @param enableRep is replication switch enable or disable 3620 */ 3621 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3622 CompletableFuture<Void> future = new CompletableFuture<>(); 3623 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3624 if (err != null) { 3625 future.completeExceptionally(err); 3626 return; 3627 } 3628 if (!tableDesc.matchReplicationScope(enableRep)) { 3629 int scope = 3630 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3631 TableDescriptor newTableDesc = 3632 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3633 addListener(modifyTable(newTableDesc), (result, err2) -> { 3634 if (err2 != null) { 3635 future.completeExceptionally(err2); 3636 } else { 3637 future.complete(result); 3638 } 3639 }); 3640 } else { 3641 future.complete(null); 3642 } 3643 }); 3644 return future; 3645 } 3646 3647 @Override 3648 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3649 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3650 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3651 if (err != null) { 3652 future.completeExceptionally(err); 3653 return; 3654 } 3655 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3656 locations.stream().filter(l -> l.getRegion() != null) 3657 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3658 .collect(Collectors.groupingBy(l -> l.getServerName(), 3659 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3660 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3661 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3662 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3663 futures 3664 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3665 if (err2 != null) { 3666 future.completeExceptionally(unwrapCompletionException(err2)); 3667 } else { 3668 aggregator.append(stats); 3669 } 3670 })); 3671 } 3672 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3673 (ret, err3) -> { 3674 if (err3 != null) { 3675 future.completeExceptionally(unwrapCompletionException(err3)); 3676 } else { 3677 future.complete(aggregator.sum()); 3678 } 3679 }); 3680 }); 3681 return future; 3682 } 3683 3684 @Override 3685 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3686 boolean preserveSplits) { 3687 CompletableFuture<Void> future = new CompletableFuture<>(); 3688 addListener(tableExists(tableName), (exist, err) -> { 3689 if (err != null) { 3690 future.completeExceptionally(err); 3691 return; 3692 } 3693 if (!exist) { 3694 future.completeExceptionally(new TableNotFoundException(tableName)); 3695 return; 3696 } 3697 addListener(tableExists(newTableName), (exist1, err1) -> { 3698 if (err1 != null) { 3699 future.completeExceptionally(err1); 3700 return; 3701 } 3702 if (exist1) { 3703 future.completeExceptionally(new TableExistsException(newTableName)); 3704 return; 3705 } 3706 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3707 if (err2 != null) { 3708 future.completeExceptionally(err2); 3709 return; 3710 } 3711 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3712 if (preserveSplits) { 3713 addListener(getTableSplits(tableName), (splits, err3) -> { 3714 if (err3 != null) { 3715 future.completeExceptionally(err3); 3716 } else { 3717 addListener( 3718 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc), 3719 (result, err4) -> { 3720 if (err4 != null) { 3721 future.completeExceptionally(err4); 3722 } else { 3723 future.complete(result); 3724 } 3725 }); 3726 } 3727 }); 3728 } else { 3729 addListener(createTable(newTableDesc), (result, err5) -> { 3730 if (err5 != null) { 3731 future.completeExceptionally(err5); 3732 } else { 3733 future.complete(result); 3734 } 3735 }); 3736 } 3737 }); 3738 }); 3739 }); 3740 return future; 3741 } 3742 3743 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3744 List<RegionInfo> hris) { 3745 return this.<CacheEvictionStats> newAdminCaller() 3746 .action((controller, stub) -> this.<ClearRegionBlockCacheRequest, 3747 ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub, 3748 RequestConverter.buildClearRegionBlockCacheRequest(hris), 3749 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3750 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3751 .serverName(serverName).call(); 3752 } 3753 3754 @Override 3755 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) { 3756 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3757 .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, 3758 Boolean> call(controller, stub, 3759 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(), 3760 (s, c, req, done) -> s.switchRpcThrottle(c, req, done), 3761 resp -> resp.getPreviousRpcThrottleEnabled())) 3762 .call(); 3763 return future; 3764 } 3765 3766 @Override 3767 public CompletableFuture<Boolean> isRpcThrottleEnabled() { 3768 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3769 .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, 3770 Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(), 3771 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done), 3772 resp -> resp.getRpcThrottleEnabled())) 3773 .call(); 3774 return future; 3775 } 3776 3777 @Override 3778 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) { 3779 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller() 3780 .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest, 3781 SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub, 3782 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable) 3783 .build(), 3784 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done), 3785 resp -> resp.getPreviousExceedThrottleQuotaEnabled())) 3786 .call(); 3787 return future; 3788 } 3789 3790 @Override 3791 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() { 3792 return this.<Map<TableName, Long>> newMasterCaller() 3793 .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest, 3794 GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub, 3795 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(), 3796 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done), 3797 resp -> resp.getSizesList().stream().collect(Collectors 3798 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize)))) 3799 .call(); 3800 } 3801 3802 @Override 3803 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> 3804 getRegionServerSpaceQuotaSnapshots(ServerName serverName) { 3805 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller() 3806 .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest, 3807 GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, 3808 stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(), 3809 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done), 3810 resp -> resp.getSnapshotsList().stream() 3811 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()), 3812 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot()))))) 3813 .serverName(serverName).call(); 3814 } 3815 3816 private CompletableFuture<SpaceQuotaSnapshot> 3817 getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) { 3818 return this.<SpaceQuotaSnapshot> newMasterCaller() 3819 .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse, 3820 SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(), 3821 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter)) 3822 .call(); 3823 } 3824 3825 @Override 3826 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) { 3827 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream() 3828 .filter(s -> s.getNamespace().equals(namespace)).findFirst() 3829 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3830 } 3831 3832 @Override 3833 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) { 3834 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName); 3835 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream() 3836 .filter(s -> s.getTableName().equals(protoTableName)).findFirst() 3837 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null)); 3838 } 3839 3840 @Override 3841 public CompletableFuture<Void> grant(UserPermission userPermission, 3842 boolean mergeExistingPermissions) { 3843 return this.<Void> newMasterCaller() 3844 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub, 3845 ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions), 3846 (s, c, req, done) -> s.grant(c, req, done), resp -> null)) 3847 .call(); 3848 } 3849 3850 @Override 3851 public CompletableFuture<Void> revoke(UserPermission userPermission) { 3852 return this.<Void> newMasterCaller() 3853 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller, 3854 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission), 3855 (s, c, req, done) -> s.revoke(c, req, done), resp -> null)) 3856 .call(); 3857 } 3858 3859 @Override 3860 public CompletableFuture<List<UserPermission>> 3861 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) { 3862 return this.<List<UserPermission>> newMasterCaller() 3863 .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, 3864 GetUserPermissionsResponse, List<UserPermission>> call(controller, stub, 3865 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest), 3866 (s, c, req, done) -> s.getUserPermissions(c, req, done), 3867 resp -> resp.getUserPermissionList().stream() 3868 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm)) 3869 .collect(Collectors.toList()))) 3870 .call(); 3871 } 3872 3873 @Override 3874 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName, 3875 List<Permission> permissions) { 3876 return this.<List<Boolean>> newMasterCaller() 3877 .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse, 3878 List<Boolean>> call(controller, stub, 3879 ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions), 3880 (s, c, req, done) -> s.hasUserPermissions(c, req, done), 3881 resp -> resp.getHasUserPermissionList())) 3882 .call(); 3883 } 3884 3885 @Override 3886 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) { 3887 return this.<Boolean> newMasterCaller() 3888 .action((controller, stub) -> this.call(controller, stub, 3889 RequestConverter.buildSetSnapshotCleanupRequest(on, sync), 3890 MasterService.Interface::switchSnapshotCleanup, 3891 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)) 3892 .call(); 3893 } 3894 3895 @Override 3896 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() { 3897 return this.<Boolean> newMasterCaller() 3898 .action((controller, stub) -> this.call(controller, stub, 3899 RequestConverter.buildIsSnapshotCleanupEnabledRequest(), 3900 MasterService.Interface::isSnapshotCleanupEnabled, 3901 IsSnapshotCleanupEnabledResponse::getEnabled)) 3902 .call(); 3903 } 3904 3905 private CompletableFuture<List<LogEntry>> getSlowLogResponses( 3906 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit, 3907 final String logType) { 3908 if (CollectionUtils.isEmpty(serverNames)) { 3909 return CompletableFuture.completedFuture(Collections.emptyList()); 3910 } 3911 return CompletableFuture.supplyAsync(() -> serverNames 3912 .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName, 3913 filterParams, limit, logType)) 3914 .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())); 3915 } 3916 3917 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName, 3918 Map<String, Object> filterParams, int limit, String logType) { 3919 return this.<List<LogEntry>> newAdminCaller() 3920 .action((controller, stub) -> this.adminCall(controller, stub, 3921 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), 3922 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) 3923 .serverName(serverName).call(); 3924 } 3925 3926 @Override 3927 public CompletableFuture<List<Boolean>> 3928 clearSlowLogResponses(@Nullable Set<ServerName> serverNames) { 3929 if (CollectionUtils.isEmpty(serverNames)) { 3930 return CompletableFuture.completedFuture(Collections.emptyList()); 3931 } 3932 List<CompletableFuture<Boolean>> clearSlowLogResponseList = 3933 serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList()); 3934 return convertToFutureOfList(clearSlowLogResponseList); 3935 } 3936 3937 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) { 3938 return this.<Boolean> newAdminCaller() 3939 .action((controller, stub) -> this.adminCall(controller, stub, 3940 RequestConverter.buildClearSlowLogResponseRequest(), 3941 AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload)) 3942 .serverName(serverName).call(); 3943 } 3944 3945 private static <T> CompletableFuture<List<T>> 3946 convertToFutureOfList(List<CompletableFuture<T>> futures) { 3947 CompletableFuture<Void> allDoneFuture = 3948 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 3949 return allDoneFuture 3950 .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); 3951 } 3952 3953 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) { 3954 return this.<List<LogEntry>> newMasterCaller() 3955 .action((controller, stub) -> this.call(controller, stub, 3956 ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries, 3957 ProtobufUtil::toBalancerDecisionResponse)) 3958 .call(); 3959 } 3960 3961 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) { 3962 return this.<List<LogEntry>> newMasterCaller() 3963 .action((controller, stub) -> this.call(controller, stub, 3964 ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries, 3965 ProtobufUtil::toBalancerRejectionResponse)) 3966 .call(); 3967 } 3968 3969 @Override 3970 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, 3971 String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { 3972 if (logType == null || serverType == null) { 3973 throw new IllegalArgumentException("logType and/or serverType cannot be empty"); 3974 } 3975 switch (logType) { 3976 case "SLOW_LOG": 3977 case "LARGE_LOG": 3978 if (ServerType.MASTER.equals(serverType)) { 3979 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); 3980 } 3981 return getSlowLogResponses(filterParams, serverNames, limit, logType); 3982 case "BALANCER_DECISION": 3983 if (ServerType.REGION_SERVER.equals(serverType)) { 3984 throw new IllegalArgumentException( 3985 "Balancer Decision logs are not maintained by HRegionServer"); 3986 } 3987 return getBalancerDecisions(limit); 3988 case "BALANCER_REJECTION": 3989 if (ServerType.REGION_SERVER.equals(serverType)) { 3990 throw new IllegalArgumentException( 3991 "Balancer Rejection logs are not maintained by HRegionServer"); 3992 } 3993 return getBalancerRejections(limit); 3994 default: 3995 return CompletableFuture.completedFuture(Collections.emptyList()); 3996 } 3997 } 3998 3999 @Override 4000 public CompletableFuture<Void> flushMasterStore() { 4001 FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder(); 4002 return this.<Void> newMasterCaller() 4003 .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse, 4004 Void> call(controller, stub, request.build(), 4005 (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)) 4006 .call(); 4007 } 4008}