001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024import com.google.protobuf.Message;
025import com.google.protobuf.RpcChannel;
026import edu.umd.cs.findbugs.annotations.Nullable;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
093import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
192    .IsSnapshotCleanupEnabledResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
244    .SetSnapshotCleanupResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
285import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
286import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
287import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
288import org.apache.hadoop.hbase.util.Bytes;
289import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
290import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
291import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
292import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
293import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
294import org.apache.hbase.thirdparty.io.netty.util.Timeout;
295import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
296import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
297import org.apache.yetus.audience.InterfaceAudience;
298import org.slf4j.Logger;
299import org.slf4j.LoggerFactory;
300
301/**
302 * The implementation of AsyncAdmin.
303 * <p>
304 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
305 * be finished inside the rpc framework thread, which means that the callbacks registered to the
306 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
307 * this class should not try to do time consuming tasks in the callbacks.
308 * @since 2.0.0
309 * @see AsyncHBaseAdmin
310 * @see AsyncConnection#getAdmin()
311 * @see AsyncConnection#getAdminBuilder()
312 */
313@InterfaceAudience.Private
314class RawAsyncHBaseAdmin implements AsyncAdmin {
315  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
316
317  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
318
319  private final AsyncConnectionImpl connection;
320
321  private final HashedWheelTimer retryTimer;
322
323  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
324
325  private final long rpcTimeoutNs;
326
327  private final long operationTimeoutNs;
328
329  private final long pauseNs;
330
331  private final long pauseForCQTBENs;
332
333  private final int maxAttempts;
334
335  private final int startLogErrorsCnt;
336
337  private final NonceGenerator ng;
338
339  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
340      AsyncAdminBuilderBase builder) {
341    this.connection = connection;
342    this.retryTimer = retryTimer;
343    this.metaTable = connection.getTable(META_TABLE_NAME);
344    this.rpcTimeoutNs = builder.rpcTimeoutNs;
345    this.operationTimeoutNs = builder.operationTimeoutNs;
346    this.pauseNs = builder.pauseNs;
347    if (builder.pauseForCQTBENs < builder.pauseNs) {
348      LOG.warn(
349        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
350          " the normal pause value {} ms, use the greater one instead",
351        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
352        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
353      this.pauseForCQTBENs = builder.pauseNs;
354    } else {
355      this.pauseForCQTBENs = builder.pauseForCQTBENs;
356    }
357    this.maxAttempts = builder.maxAttempts;
358    this.startLogErrorsCnt = builder.startLogErrorsCnt;
359    this.ng = connection.getNonceGenerator();
360  }
361
362  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
363    return this.connection.callerFactory.<T> masterRequest()
364      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
365      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
366      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
367      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
368  }
369
370  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
371    return this.connection.callerFactory.<T> adminRequest()
372      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
373      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
374      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
375      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
376  }
377
378  @FunctionalInterface
379  private interface MasterRpcCall<RESP, REQ> {
380    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
381        RpcCallback<RESP> done);
382  }
383
384  @FunctionalInterface
385  private interface AdminRpcCall<RESP, REQ> {
386    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
387        RpcCallback<RESP> done);
388  }
389
390  @FunctionalInterface
391  private interface Converter<D, S> {
392    D convert(S src) throws IOException;
393  }
394
395  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
396      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
397      Converter<RESP, PRESP> respConverter) {
398    CompletableFuture<RESP> future = new CompletableFuture<>();
399    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
400
401      @Override
402      public void run(PRESP resp) {
403        if (controller.failed()) {
404          future.completeExceptionally(controller.getFailed());
405        } else {
406          try {
407            future.complete(respConverter.convert(resp));
408          } catch (IOException e) {
409            future.completeExceptionally(e);
410          }
411        }
412      }
413    });
414    return future;
415  }
416
417  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
418      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
419      Converter<RESP, PRESP> respConverter) {
420    CompletableFuture<RESP> future = new CompletableFuture<>();
421    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
422
423      @Override
424      public void run(PRESP resp) {
425        if (controller.failed()) {
426          future.completeExceptionally(controller.getFailed());
427        } else {
428          try {
429            future.complete(respConverter.convert(resp));
430          } catch (IOException e) {
431            future.completeExceptionally(e);
432          }
433        }
434      }
435    });
436    return future;
437  }
438
439  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
440      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
441      ProcedureBiConsumer consumer) {
442    return procedureCall(b -> {
443    }, preq, rpcCall, respConverter, consumer);
444  }
445
446  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
447      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
448      ProcedureBiConsumer consumer) {
449    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
450  }
451
452  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
453      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
454      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
455      ProcedureBiConsumer consumer) {
456    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
457        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
458    prioritySetter.accept(builder);
459    CompletableFuture<Long> procFuture = builder.call();
460    CompletableFuture<Void> future = waitProcedureResult(procFuture);
461    addListener(future, consumer);
462    return future;
463  }
464
465  @FunctionalInterface
466  private interface TableOperator {
467    CompletableFuture<Void> operate(TableName table);
468  }
469
470  @Override
471  public CompletableFuture<Boolean> tableExists(TableName tableName) {
472    if (TableName.isMetaTableName(tableName)) {
473      return CompletableFuture.completedFuture(true);
474    }
475    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
476  }
477
478  @Override
479  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
480    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
481      includeSysTables));
482  }
483
484  /**
485   * {@link #listTableDescriptors(boolean)}
486   */
487  @Override
488  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
489      boolean includeSysTables) {
490    Preconditions.checkNotNull(pattern,
491      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
492    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
493      includeSysTables));
494  }
495
496  @Override
497  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
498    Preconditions.checkNotNull(tableNames,
499      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
500    if (tableNames.isEmpty()) {
501      return CompletableFuture.completedFuture(Collections.emptyList());
502    }
503    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
504  }
505
506  private CompletableFuture<List<TableDescriptor>>
507      getTableDescriptors(GetTableDescriptorsRequest request) {
508    return this.<List<TableDescriptor>> newMasterCaller()
509        .action((controller, stub) -> this
510            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
511              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
512              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
513        .call();
514  }
515
516  @Override
517  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
518    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
519  }
520
521  @Override
522  public CompletableFuture<List<TableName>>
523      listTableNames(Pattern pattern, boolean includeSysTables) {
524    Preconditions.checkNotNull(pattern,
525        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
526    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
527  }
528
529  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
530    return this
531        .<List<TableName>> newMasterCaller()
532        .action(
533          (controller, stub) -> this
534              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
535                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
536                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
537  }
538
539  @Override
540  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
541    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
542        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
543        List<TableDescriptor>> call(
544          controller, stub,
545          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
546          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
547          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
548        .call();
549  }
550
551  @Override
552  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
553    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
554        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
555        List<TableName>> call(
556          controller, stub,
557          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
558          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
559          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
560        .call();
561  }
562
563  @Override
564  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
565    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
566    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
567      .action((controller, stub) -> this
568        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
569          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
570          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
571          (resp) -> resp.getTableSchemaList()))
572      .call(), (tableSchemas, error) -> {
573        if (error != null) {
574          future.completeExceptionally(error);
575          return;
576        }
577        if (!tableSchemas.isEmpty()) {
578          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
579        } else {
580          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
581        }
582      });
583    return future;
584  }
585
586  @Override
587  public CompletableFuture<Void> createTable(TableDescriptor desc) {
588    return createTable(desc.getTableName(),
589      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
590  }
591
592  @Override
593  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
594      int numRegions) {
595    try {
596      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
597    } catch (IllegalArgumentException e) {
598      return failedFuture(e);
599    }
600  }
601
602  @Override
603  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
604    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
605        + " use createTable(TableDescriptor) instead");
606    try {
607      verifySplitKeys(splitKeys);
608      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
609        splitKeys, ng.getNonceGroup(), ng.newNonce()));
610    } catch (IllegalArgumentException e) {
611      return failedFuture(e);
612    }
613  }
614
615  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
616    Preconditions.checkNotNull(tableName, "table name is null");
617    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
618      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
619      new CreateTableProcedureBiConsumer(tableName));
620  }
621
622  @Override
623  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
624    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
625      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
626        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
627      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
628  }
629
630  @Override
631  public CompletableFuture<Void> deleteTable(TableName tableName) {
632    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
633      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
634      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
635      new DeleteTableProcedureBiConsumer(tableName));
636  }
637
638  @Override
639  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
640    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
641      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
642        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
643      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
644  }
645
646  @Override
647  public CompletableFuture<Void> enableTable(TableName tableName) {
648    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
649      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
650      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
651      new EnableTableProcedureBiConsumer(tableName));
652  }
653
654  @Override
655  public CompletableFuture<Void> disableTable(TableName tableName) {
656    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
657      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
658      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
659      new DisableTableProcedureBiConsumer(tableName));
660  }
661
662  /**
663   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
664   * using passed parameters. Sets error or boolean result ('true' if table matches
665   * the passed-in targetState).
666   */
667  private static CompletableFuture<Boolean> completeCheckTableState(
668      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
669      TableState.State targetState, TableName tableName) {
670    if (error != null) {
671      future.completeExceptionally(error);
672    } else {
673      if (tableState != null) {
674        future.complete(tableState.inStates(targetState));
675      } else {
676        future.completeExceptionally(new TableNotFoundException(tableName));
677      }
678    }
679    return future;
680  }
681
682  @Override
683  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
684    if (TableName.isMetaTableName(tableName)) {
685      return CompletableFuture.completedFuture(true);
686    }
687    CompletableFuture<Boolean> future = new CompletableFuture<>();
688    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
689      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
690        TableState.State.ENABLED, tableName);
691    });
692    return future;
693  }
694
695  @Override
696  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
697    if (TableName.isMetaTableName(tableName)) {
698      return CompletableFuture.completedFuture(false);
699    }
700    CompletableFuture<Boolean> future = new CompletableFuture<>();
701    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
702      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
703        TableState.State.DISABLED, tableName);
704    });
705    return future;
706  }
707
708  @Override
709  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
710    return isTableAvailable(tableName, Optional.empty());
711  }
712
713  @Override
714  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
715    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
716        + " use isTableAvailable(TableName) instead");
717    return isTableAvailable(tableName, Optional.of(splitKeys));
718  }
719
720  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
721      Optional<byte[][]> splitKeys) {
722    if (TableName.isMetaTableName(tableName)) {
723      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
724        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
725    }
726    CompletableFuture<Boolean> future = new CompletableFuture<>();
727    addListener(isTableEnabled(tableName), (enabled, error) -> {
728      if (error != null) {
729        if (error instanceof TableNotFoundException) {
730          future.complete(false);
731        } else {
732          future.completeExceptionally(error);
733        }
734        return;
735      }
736      if (!enabled) {
737        future.complete(false);
738      } else {
739        addListener(
740          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
741          (locations, error1) -> {
742            if (error1 != null) {
743              future.completeExceptionally(error1);
744              return;
745            }
746            List<HRegionLocation> notDeployedRegions = locations.stream()
747              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
748            if (notDeployedRegions.size() > 0) {
749              if (LOG.isDebugEnabled()) {
750                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
751              }
752              future.complete(false);
753              return;
754            }
755
756            Optional<Boolean> available =
757              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
758            future.complete(available.orElse(true));
759          });
760      }
761    });
762    return future;
763  }
764
765  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
766    int regionCount = 0;
767    for (HRegionLocation location : locations) {
768      RegionInfo info = location.getRegion();
769      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
770        regionCount++;
771        continue;
772      }
773      for (byte[] splitKey : splitKeys) {
774        // Just check if the splitkey is available
775        if (Bytes.equals(info.getStartKey(), splitKey)) {
776          regionCount++;
777          break;
778        }
779      }
780    }
781    return regionCount == splitKeys.length + 1;
782  }
783
784  @Override
785  public CompletableFuture<Void> addColumnFamily(
786      TableName tableName, ColumnFamilyDescriptor columnFamily) {
787    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
788      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
789        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
790      new AddColumnFamilyProcedureBiConsumer(tableName));
791  }
792
793  @Override
794  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
795    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
796      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
797        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
798      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
799  }
800
801  @Override
802  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
803      ColumnFamilyDescriptor columnFamily) {
804    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
805      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
806        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
807      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
808  }
809
810  @Override
811  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
812    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
813      RequestConverter.buildCreateNamespaceRequest(descriptor),
814      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
815      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
816  }
817
818  @Override
819  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
820    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
821      RequestConverter.buildModifyNamespaceRequest(descriptor),
822      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
823      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
824  }
825
826  @Override
827  public CompletableFuture<Void> deleteNamespace(String name) {
828    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
829      RequestConverter.buildDeleteNamespaceRequest(name),
830      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
831      new DeleteNamespaceProcedureBiConsumer(name));
832  }
833
834  @Override
835  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
836    return this
837        .<NamespaceDescriptor> newMasterCaller()
838        .action(
839          (controller, stub) -> this
840              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
841                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
842                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
843                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
844  }
845
846  @Override
847  public CompletableFuture<List<String>> listNamespaces() {
848    return this
849        .<List<String>> newMasterCaller()
850        .action(
851          (controller, stub) -> this
852              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
853                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
854                  done) -> s.listNamespaces(c, req, done),
855                (resp) -> resp.getNamespaceNameList())).call();
856  }
857
858  @Override
859  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
860    return this
861        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
862              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
863                  List<NamespaceDescriptor>> call(controller, stub,
864                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
865                      s.listNamespaceDescriptors(c, req, done),
866                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
867  }
868
869  @Override
870  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
871    return this.<List<RegionInfo>> newAdminCaller()
872        .action((controller, stub) -> this
873            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
874              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
875              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
876              resp -> ProtobufUtil.getRegionInfos(resp)))
877        .serverName(serverName).call();
878  }
879
880  @Override
881  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
882    if (tableName.equals(META_TABLE_NAME)) {
883      return connection.registry.getMetaRegionLocations()
884        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
885          .collect(Collectors.toList()));
886    } else {
887      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
888        .thenApply(
889          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
890    }
891  }
892  @Override
893  public CompletableFuture<Void> flush(TableName tableName) {
894    return flush(tableName, null);
895  }
896
897  @Override
898  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
899    CompletableFuture<Void> future = new CompletableFuture<>();
900    addListener(tableExists(tableName), (exists, err) -> {
901      if (err != null) {
902        future.completeExceptionally(err);
903      } else if (!exists) {
904        future.completeExceptionally(new TableNotFoundException(tableName));
905      } else {
906        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
907          if (err2 != null) {
908            future.completeExceptionally(err2);
909          } else if (!tableEnabled) {
910            future.completeExceptionally(new TableNotEnabledException(tableName));
911          } else {
912            Map<String, String> props = new HashMap<>();
913            if (columnFamily != null) {
914              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
915            }
916            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
917              props), (ret, err3) -> {
918                if (err3 != null) {
919                  future.completeExceptionally(err3);
920                } else {
921                  future.complete(ret);
922                }
923              });
924          }
925        });
926      }
927    });
928    return future;
929  }
930
931  @Override
932  public CompletableFuture<Void> flushRegion(byte[] regionName) {
933    return flushRegion(regionName, null);
934  }
935
936  @Override
937  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
938    CompletableFuture<Void> future = new CompletableFuture<>();
939    addListener(getRegionLocation(regionName), (location, err) -> {
940      if (err != null) {
941        future.completeExceptionally(err);
942        return;
943      }
944      ServerName serverName = location.getServerName();
945      if (serverName == null) {
946        future
947          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
948        return;
949      }
950      addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> {
951        if (err2 != null) {
952          future.completeExceptionally(err2);
953        } else {
954          future.complete(ret);
955        }
956      });
957    });
958    return future;
959  }
960
961  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo,
962      byte[] columnFamily) {
963    return this.<Void> newAdminCaller()
964            .serverName(serverName)
965            .action(
966              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
967                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
968                  .getRegionName(), columnFamily, false),
969                (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null))
970            .call();
971  }
972
973  @Override
974  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
975    CompletableFuture<Void> future = new CompletableFuture<>();
976    addListener(getRegions(sn), (hRegionInfos, err) -> {
977      if (err != null) {
978        future.completeExceptionally(err);
979        return;
980      }
981      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
982      if (hRegionInfos != null) {
983        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null)));
984      }
985      addListener(CompletableFuture.allOf(
986        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
987          if (err2 != null) {
988            future.completeExceptionally(err2);
989          } else {
990            future.complete(ret);
991          }
992        });
993    });
994    return future;
995  }
996
997  @Override
998  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
999    return compact(tableName, null, false, compactType);
1000  }
1001
1002  @Override
1003  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1004      CompactType compactType) {
1005    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1006        + "If you don't specify a columnFamily, use compact(TableName) instead");
1007    return compact(tableName, columnFamily, false, compactType);
1008  }
1009
1010  @Override
1011  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1012    return compactRegion(regionName, null, false);
1013  }
1014
1015  @Override
1016  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1017    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1018        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1019    return compactRegion(regionName, columnFamily, false);
1020  }
1021
1022  @Override
1023  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1024    return compact(tableName, null, true, compactType);
1025  }
1026
1027  @Override
1028  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1029      CompactType compactType) {
1030    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1031        + "If you don't specify a columnFamily, use compact(TableName) instead");
1032    return compact(tableName, columnFamily, true, compactType);
1033  }
1034
1035  @Override
1036  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1037    return compactRegion(regionName, null, true);
1038  }
1039
1040  @Override
1041  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1042    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1043        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1044    return compactRegion(regionName, columnFamily, true);
1045  }
1046
1047  @Override
1048  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1049    return compactRegionServer(sn, false);
1050  }
1051
1052  @Override
1053  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1054    return compactRegionServer(sn, true);
1055  }
1056
1057  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1058    CompletableFuture<Void> future = new CompletableFuture<>();
1059    addListener(getRegions(sn), (hRegionInfos, err) -> {
1060      if (err != null) {
1061        future.completeExceptionally(err);
1062        return;
1063      }
1064      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1065      if (hRegionInfos != null) {
1066        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1067      }
1068      addListener(CompletableFuture.allOf(
1069        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1070          if (err2 != null) {
1071            future.completeExceptionally(err2);
1072          } else {
1073            future.complete(ret);
1074          }
1075        });
1076    });
1077    return future;
1078  }
1079
1080  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1081      boolean major) {
1082    CompletableFuture<Void> future = new CompletableFuture<>();
1083    addListener(getRegionLocation(regionName), (location, err) -> {
1084      if (err != null) {
1085        future.completeExceptionally(err);
1086        return;
1087      }
1088      ServerName serverName = location.getServerName();
1089      if (serverName == null) {
1090        future
1091          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1092        return;
1093      }
1094      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1095        (ret, err2) -> {
1096          if (err2 != null) {
1097            future.completeExceptionally(err2);
1098          } else {
1099            future.complete(ret);
1100          }
1101        });
1102    });
1103    return future;
1104  }
1105
1106  /**
1107   * List all region locations for the specific table.
1108   */
1109  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1110    if (TableName.META_TABLE_NAME.equals(tableName)) {
1111      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1112      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1113        if (err != null) {
1114          future.completeExceptionally(err);
1115        } else if (metaRegions == null || metaRegions.isEmpty() ||
1116          metaRegions.getDefaultRegionLocation() == null) {
1117          future.completeExceptionally(new IOException("meta region does not found"));
1118        } else {
1119          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1120        }
1121      });
1122      return future;
1123    } else {
1124      // For non-meta table, we fetch all locations by scanning hbase:meta table
1125      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1126    }
1127  }
1128
1129  /**
1130   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1131   */
1132  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1133      CompactType compactType) {
1134    CompletableFuture<Void> future = new CompletableFuture<>();
1135
1136    switch (compactType) {
1137      case MOB:
1138        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1139          if (err != null) {
1140            future.completeExceptionally(err);
1141            return;
1142          }
1143          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1144          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1145            if (err2 != null) {
1146              future.completeExceptionally(err2);
1147            } else {
1148              future.complete(ret);
1149            }
1150          });
1151        });
1152        break;
1153      case NORMAL:
1154        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1155          if (err != null) {
1156            future.completeExceptionally(err);
1157            return;
1158          }
1159          if (locations == null || locations.isEmpty()) {
1160            future.completeExceptionally(new TableNotFoundException(tableName));
1161          }
1162          CompletableFuture<?>[] compactFutures =
1163            locations.stream().filter(l -> l.getRegion() != null)
1164              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1165              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1166              .toArray(CompletableFuture<?>[]::new);
1167          // future complete unless all of the compact futures are completed.
1168          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1169            if (err2 != null) {
1170              future.completeExceptionally(err2);
1171            } else {
1172              future.complete(ret);
1173            }
1174          });
1175        });
1176        break;
1177      default:
1178        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1179    }
1180    return future;
1181  }
1182
1183  /**
1184   * Compact the region at specific region server.
1185   */
1186  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1187      final boolean major, byte[] columnFamily) {
1188    return this
1189        .<Void> newAdminCaller()
1190        .serverName(sn)
1191        .action(
1192          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1193            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1194              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1195            resp -> null)).call();
1196  }
1197
1198  private byte[] toEncodeRegionName(byte[] regionName) {
1199    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1200      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1201  }
1202
1203  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1204      CompletableFuture<TableName> result) {
1205    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1206      if (err != null) {
1207        result.completeExceptionally(err);
1208        return;
1209      }
1210      RegionInfo regionInfo = location.getRegion();
1211      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1212        result.completeExceptionally(
1213          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1214        return;
1215      }
1216      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1217        if (!tableName.get().equals(regionInfo.getTable())) {
1218          // tables of this two region should be same.
1219          result.completeExceptionally(
1220            new IllegalArgumentException("Cannot merge regions from two different tables " +
1221              tableName.get() + " and " + regionInfo.getTable()));
1222        } else {
1223          result.complete(tableName.get());
1224        }
1225      }
1226    });
1227  }
1228
1229  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1230    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1231    CompletableFuture<TableName> future = new CompletableFuture<>();
1232    for (byte[] encodedRegionName : encodedRegionNames) {
1233      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1234    }
1235    return future;
1236  }
1237
1238  @Override
1239  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1240    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1241  }
1242
1243  @Override
1244  public CompletableFuture<Boolean> isMergeEnabled() {
1245    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1246  }
1247
1248  @Override
1249  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1250    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1251  }
1252
1253  @Override
1254  public CompletableFuture<Boolean> isSplitEnabled() {
1255    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1256  }
1257
1258  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1259      MasterSwitchType switchType) {
1260    SetSplitOrMergeEnabledRequest request =
1261      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1262    return this.<Boolean> newMasterCaller()
1263      .action((controller, stub) -> this
1264        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1265          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1266          (resp) -> resp.getPrevValueList().get(0)))
1267      .call();
1268  }
1269
1270  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1271    IsSplitOrMergeEnabledRequest request =
1272        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1273    return this
1274        .<Boolean> newMasterCaller()
1275        .action(
1276          (controller, stub) -> this
1277              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1278                controller, stub, request,
1279                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1280                (resp) -> resp.getEnabled())).call();
1281  }
1282
1283  @Override
1284  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1285    if (nameOfRegionsToMerge.size() < 2) {
1286      return failedFuture(new IllegalArgumentException(
1287        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1288    }
1289    CompletableFuture<Void> future = new CompletableFuture<>();
1290    byte[][] encodedNameOfRegionsToMerge =
1291      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1292
1293    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1294      if (err != null) {
1295        future.completeExceptionally(err);
1296        return;
1297      }
1298
1299      final MergeTableRegionsRequest request;
1300      try {
1301        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1302          forcible, ng.getNonceGroup(), ng.newNonce());
1303      } catch (DeserializationException e) {
1304        future.completeExceptionally(e);
1305        return;
1306      }
1307
1308      addListener(
1309        this.procedureCall(tableName, request,
1310          MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1311          new MergeTableRegionProcedureBiConsumer(tableName)),
1312        (ret, err2) -> {
1313          if (err2 != null) {
1314            future.completeExceptionally(err2);
1315          } else {
1316            future.complete(ret);
1317          }
1318        });
1319    });
1320    return future;
1321  }
1322
1323  @Override
1324  public CompletableFuture<Void> split(TableName tableName) {
1325    CompletableFuture<Void> future = new CompletableFuture<>();
1326    addListener(tableExists(tableName), (exist, error) -> {
1327      if (error != null) {
1328        future.completeExceptionally(error);
1329        return;
1330      }
1331      if (!exist) {
1332        future.completeExceptionally(new TableNotFoundException(tableName));
1333        return;
1334      }
1335      addListener(
1336        metaTable
1337          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1338            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1339            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1340        (results, err2) -> {
1341          if (err2 != null) {
1342            future.completeExceptionally(err2);
1343            return;
1344          }
1345          if (results != null && !results.isEmpty()) {
1346            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1347            for (Result r : results) {
1348              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1349                continue;
1350              }
1351              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1352              if (rl != null) {
1353                for (HRegionLocation h : rl.getRegionLocations()) {
1354                  if (h != null && h.getServerName() != null) {
1355                    RegionInfo hri = h.getRegion();
1356                    if (hri == null || hri.isSplitParent() ||
1357                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1358                      continue;
1359                    }
1360                    splitFutures.add(split(hri, null));
1361                  }
1362                }
1363              }
1364            }
1365            addListener(
1366              CompletableFuture
1367                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1368              (ret, exception) -> {
1369                if (exception != null) {
1370                  future.completeExceptionally(exception);
1371                  return;
1372                }
1373                future.complete(ret);
1374              });
1375          } else {
1376            future.complete(null);
1377          }
1378        });
1379    });
1380    return future;
1381  }
1382
1383  @Override
1384  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1385    CompletableFuture<Void> result = new CompletableFuture<>();
1386    if (splitPoint == null) {
1387      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1388    }
1389    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1390      (loc, err) -> {
1391        if (err != null) {
1392          result.completeExceptionally(err);
1393        } else if (loc == null || loc.getRegion() == null) {
1394          result.completeExceptionally(new IllegalArgumentException(
1395            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1396        } else {
1397          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1398            if (err2 != null) {
1399              result.completeExceptionally(err2);
1400            } else {
1401              result.complete(ret);
1402            }
1403
1404          });
1405        }
1406      });
1407    return result;
1408  }
1409
1410  @Override
1411  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1412    CompletableFuture<Void> future = new CompletableFuture<>();
1413    addListener(getRegionLocation(regionName), (location, err) -> {
1414      if (err != null) {
1415        future.completeExceptionally(err);
1416        return;
1417      }
1418      RegionInfo regionInfo = location.getRegion();
1419      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1420        future
1421          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1422            "Replicas are auto-split when their primary is split."));
1423        return;
1424      }
1425      ServerName serverName = location.getServerName();
1426      if (serverName == null) {
1427        future
1428          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1429        return;
1430      }
1431      addListener(split(regionInfo, null), (ret, err2) -> {
1432        if (err2 != null) {
1433          future.completeExceptionally(err2);
1434        } else {
1435          future.complete(ret);
1436        }
1437      });
1438    });
1439    return future;
1440  }
1441
1442  @Override
1443  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1444    Preconditions.checkNotNull(splitPoint,
1445      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1446    CompletableFuture<Void> future = new CompletableFuture<>();
1447    addListener(getRegionLocation(regionName), (location, err) -> {
1448      if (err != null) {
1449        future.completeExceptionally(err);
1450        return;
1451      }
1452      RegionInfo regionInfo = location.getRegion();
1453      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1454        future
1455          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1456            "Replicas are auto-split when their primary is split."));
1457        return;
1458      }
1459      ServerName serverName = location.getServerName();
1460      if (serverName == null) {
1461        future
1462          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1463        return;
1464      }
1465      if (regionInfo.getStartKey() != null &&
1466        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1467        future.completeExceptionally(
1468          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1469        return;
1470      }
1471      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1472        if (err2 != null) {
1473          future.completeExceptionally(err2);
1474        } else {
1475          future.complete(ret);
1476        }
1477      });
1478    });
1479    return future;
1480  }
1481
1482  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1483    CompletableFuture<Void> future = new CompletableFuture<>();
1484    TableName tableName = hri.getTable();
1485    final SplitTableRegionRequest request;
1486    try {
1487      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1488        ng.newNonce());
1489    } catch (DeserializationException e) {
1490      future.completeExceptionally(e);
1491      return future;
1492    }
1493
1494    addListener(
1495      this.procedureCall(tableName,
1496        request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1497        new SplitTableRegionProcedureBiConsumer(tableName)),
1498      (ret, err2) -> {
1499        if (err2 != null) {
1500          future.completeExceptionally(err2);
1501        } else {
1502          future.complete(ret);
1503        }
1504      });
1505    return future;
1506  }
1507
1508  @Override
1509  public CompletableFuture<Void> assign(byte[] regionName) {
1510    CompletableFuture<Void> future = new CompletableFuture<>();
1511    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1512      if (err != null) {
1513        future.completeExceptionally(err);
1514        return;
1515      }
1516      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1517        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1518          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1519          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1520        .call(), (ret, err2) -> {
1521          if (err2 != null) {
1522            future.completeExceptionally(err2);
1523          } else {
1524            future.complete(ret);
1525          }
1526        });
1527    });
1528    return future;
1529  }
1530
1531  @Override
1532  public CompletableFuture<Void> unassign(byte[] regionName) {
1533    CompletableFuture<Void> future = new CompletableFuture<>();
1534    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1535      if (err != null) {
1536        future.completeExceptionally(err);
1537        return;
1538      }
1539      addListener(
1540        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1541          .action(((controller, stub) -> this
1542            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1543              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1544              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1545          .call(),
1546        (ret, err2) -> {
1547          if (err2 != null) {
1548            future.completeExceptionally(err2);
1549          } else {
1550            future.complete(ret);
1551          }
1552        });
1553    });
1554    return future;
1555  }
1556
1557  @Override
1558  public CompletableFuture<Void> offline(byte[] regionName) {
1559    CompletableFuture<Void> future = new CompletableFuture<>();
1560    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1561      if (err != null) {
1562        future.completeExceptionally(err);
1563        return;
1564      }
1565      addListener(
1566        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1567          .action(((controller, stub) -> this
1568            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1569              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1570              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1571          .call(),
1572        (ret, err2) -> {
1573          if (err2 != null) {
1574            future.completeExceptionally(err2);
1575          } else {
1576            future.complete(ret);
1577          }
1578        });
1579    });
1580    return future;
1581  }
1582
1583  @Override
1584  public CompletableFuture<Void> move(byte[] regionName) {
1585    CompletableFuture<Void> future = new CompletableFuture<>();
1586    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1587      if (err != null) {
1588        future.completeExceptionally(err);
1589        return;
1590      }
1591      addListener(
1592        moveRegion(regionInfo,
1593          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1594        (ret, err2) -> {
1595          if (err2 != null) {
1596            future.completeExceptionally(err2);
1597          } else {
1598            future.complete(ret);
1599          }
1600        });
1601    });
1602    return future;
1603  }
1604
1605  @Override
1606  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1607    Preconditions.checkNotNull(destServerName,
1608      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1609    CompletableFuture<Void> future = new CompletableFuture<>();
1610    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1611      if (err != null) {
1612        future.completeExceptionally(err);
1613        return;
1614      }
1615      addListener(
1616        moveRegion(regionInfo, RequestConverter
1617          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1618        (ret, err2) -> {
1619          if (err2 != null) {
1620            future.completeExceptionally(err2);
1621          } else {
1622            future.complete(ret);
1623          }
1624        });
1625    });
1626    return future;
1627  }
1628
1629  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1630    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1631      .action(
1632        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1633          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1634      .call();
1635  }
1636
1637  @Override
1638  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1639    return this
1640        .<Void> newMasterCaller()
1641        .action(
1642          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1643            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1644            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1645  }
1646
1647  @Override
1648  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1649    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1650    Scan scan = QuotaTableUtil.makeScan(filter);
1651    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1652        .scan(scan, new AdvancedScanResultConsumer() {
1653          List<QuotaSettings> settings = new ArrayList<>();
1654
1655          @Override
1656          public void onNext(Result[] results, ScanController controller) {
1657            for (Result result : results) {
1658              try {
1659                QuotaTableUtil.parseResultToCollection(result, settings);
1660              } catch (IOException e) {
1661                controller.terminate();
1662                future.completeExceptionally(e);
1663              }
1664            }
1665          }
1666
1667          @Override
1668          public void onError(Throwable error) {
1669            future.completeExceptionally(error);
1670          }
1671
1672          @Override
1673          public void onComplete() {
1674            future.complete(settings);
1675          }
1676        });
1677    return future;
1678  }
1679
1680  @Override
1681  public CompletableFuture<Void> addReplicationPeer(String peerId,
1682      ReplicationPeerConfig peerConfig, boolean enabled) {
1683    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1684      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1685      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1686      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1687  }
1688
1689  @Override
1690  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1691    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1692      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1693      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1694      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1695  }
1696
1697  @Override
1698  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1699    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1700      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1701      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1702      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1703  }
1704
1705  @Override
1706  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1707    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1708      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1709      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1710      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1711  }
1712
1713  @Override
1714  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1715    return this
1716        .<ReplicationPeerConfig> newMasterCaller()
1717        .action(
1718          (controller, stub) -> this
1719              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1720                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
1721                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1722                (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1723  }
1724
1725  @Override
1726  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1727      ReplicationPeerConfig peerConfig) {
1728    return this
1729        .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1730          RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1731          (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1732          (resp) -> resp.getProcId(),
1733          new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1734  }
1735
1736  @Override
1737  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1738      Map<TableName, List<String>> tableCfs) {
1739    if (tableCfs == null) {
1740      return failedFuture(new ReplicationException("tableCfs is null"));
1741    }
1742
1743    CompletableFuture<Void> future = new CompletableFuture<Void>();
1744    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1745      if (!completeExceptionally(future, error)) {
1746        ReplicationPeerConfig newPeerConfig =
1747          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1748        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1749          if (!completeExceptionally(future, error)) {
1750            future.complete(result);
1751          }
1752        });
1753      }
1754    });
1755    return future;
1756  }
1757
1758  @Override
1759  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1760      Map<TableName, List<String>> tableCfs) {
1761    if (tableCfs == null) {
1762      return failedFuture(new ReplicationException("tableCfs is null"));
1763    }
1764
1765    CompletableFuture<Void> future = new CompletableFuture<Void>();
1766    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1767      if (!completeExceptionally(future, error)) {
1768        ReplicationPeerConfig newPeerConfig = null;
1769        try {
1770          newPeerConfig = ReplicationPeerConfigUtil
1771            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1772        } catch (ReplicationException e) {
1773          future.completeExceptionally(e);
1774          return;
1775        }
1776        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1777          if (!completeExceptionally(future, error)) {
1778            future.complete(result);
1779          }
1780        });
1781      }
1782    });
1783    return future;
1784  }
1785
1786  @Override
1787  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1788    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1789  }
1790
1791  @Override
1792  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1793    Preconditions.checkNotNull(pattern,
1794      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1795    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1796  }
1797
1798  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1799      ListReplicationPeersRequest request) {
1800    return this
1801        .<List<ReplicationPeerDescription>> newMasterCaller()
1802        .action(
1803          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1804              List<ReplicationPeerDescription>> call(controller, stub, request,
1805                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1806                (resp) -> resp.getPeerDescList().stream()
1807                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1808                    .collect(Collectors.toList()))).call();
1809  }
1810
1811  @Override
1812  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1813    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1814    addListener(listTableDescriptors(), (tables, error) -> {
1815      if (!completeExceptionally(future, error)) {
1816        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1817        tables.forEach(table -> {
1818          Map<String, Integer> cfs = new HashMap<>();
1819          Stream.of(table.getColumnFamilies())
1820            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1821            .forEach(column -> {
1822              cfs.put(column.getNameAsString(), column.getScope());
1823            });
1824          if (!cfs.isEmpty()) {
1825            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1826          }
1827        });
1828        future.complete(replicatedTableCFs);
1829      }
1830    });
1831    return future;
1832  }
1833
1834  @Override
1835  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1836    SnapshotProtos.SnapshotDescription snapshot =
1837      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1838    try {
1839      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1840    } catch (IllegalArgumentException e) {
1841      return failedFuture(e);
1842    }
1843    CompletableFuture<Void> future = new CompletableFuture<>();
1844    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1845    addListener(this.<Long> newMasterCaller()
1846      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1847        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1848        resp -> resp.getExpectedTimeout()))
1849      .call(), (expectedTimeout, err) -> {
1850        if (err != null) {
1851          future.completeExceptionally(err);
1852          return;
1853        }
1854        TimerTask pollingTask = new TimerTask() {
1855          int tries = 0;
1856          long startTime = EnvironmentEdgeManager.currentTime();
1857          long endTime = startTime + expectedTimeout;
1858          long maxPauseTime = expectedTimeout / maxAttempts;
1859
1860          @Override
1861          public void run(Timeout timeout) throws Exception {
1862            if (EnvironmentEdgeManager.currentTime() < endTime) {
1863              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1864                if (err2 != null) {
1865                  future.completeExceptionally(err2);
1866                } else if (done) {
1867                  future.complete(null);
1868                } else {
1869                  // retry again after pauseTime.
1870                  long pauseTime =
1871                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1872                  pauseTime = Math.min(pauseTime, maxPauseTime);
1873                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1874                    TimeUnit.MILLISECONDS);
1875                }
1876              });
1877            } else {
1878              future.completeExceptionally(
1879                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1880                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1881            }
1882          }
1883        };
1884        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1885      });
1886    return future;
1887  }
1888
1889  @Override
1890  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1891    return this
1892        .<Boolean> newMasterCaller()
1893        .action(
1894          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1895            controller,
1896            stub,
1897            IsSnapshotDoneRequest.newBuilder()
1898                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1899                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1900  }
1901
1902  @Override
1903  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1904    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1905      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1906      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1907    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1908  }
1909
1910  @Override
1911  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1912      boolean restoreAcl) {
1913    CompletableFuture<Void> future = new CompletableFuture<>();
1914    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1915      if (err != null) {
1916        future.completeExceptionally(err);
1917        return;
1918      }
1919      TableName tableName = null;
1920      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1921        for (SnapshotDescription snap : snapshotDescriptions) {
1922          if (snap.getName().equals(snapshotName)) {
1923            tableName = snap.getTableName();
1924            break;
1925          }
1926        }
1927      }
1928      if (tableName == null) {
1929        future.completeExceptionally(new RestoreSnapshotException(
1930          "Unable to find the table name for snapshot=" + snapshotName));
1931        return;
1932      }
1933      final TableName finalTableName = tableName;
1934      addListener(tableExists(finalTableName), (exists, err2) -> {
1935        if (err2 != null) {
1936          future.completeExceptionally(err2);
1937        } else if (!exists) {
1938          // if table does not exist, then just clone snapshot into new table.
1939          completeConditionalOnFuture(future,
1940            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1941        } else {
1942          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1943            if (err4 != null) {
1944              future.completeExceptionally(err4);
1945            } else if (!disabled) {
1946              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1947            } else {
1948              completeConditionalOnFuture(future,
1949                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1950            }
1951          });
1952        }
1953      });
1954    });
1955    return future;
1956  }
1957
1958  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1959      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1960    if (takeFailSafeSnapshot) {
1961      CompletableFuture<Void> future = new CompletableFuture<>();
1962      // Step.1 Take a snapshot of the current state
1963      String failSafeSnapshotSnapshotNameFormat =
1964        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1965          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1966      final String failSafeSnapshotSnapshotName =
1967        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1968          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1969          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1970      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1971      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1972        if (err != null) {
1973          future.completeExceptionally(err);
1974        } else {
1975          // Step.2 Restore snapshot
1976          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1977            (void2, err2) -> {
1978              if (err2 != null) {
1979                // Step.3.a Something went wrong during the restore and try to rollback.
1980                addListener(
1981                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1982                  (void3, err3) -> {
1983                    if (err3 != null) {
1984                      future.completeExceptionally(err3);
1985                    } else {
1986                      String msg =
1987                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1988                          failSafeSnapshotSnapshotName + " succeeded.";
1989                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1990                    }
1991                  });
1992              } else {
1993                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1994                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1995                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1996                  if (err3 != null) {
1997                    LOG.error(
1998                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1999                      err3);
2000                    future.completeExceptionally(err3);
2001                  } else {
2002                    future.complete(ret3);
2003                  }
2004                });
2005              }
2006            });
2007        }
2008      });
2009      return future;
2010    } else {
2011      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
2012    }
2013  }
2014
2015  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2016      CompletableFuture<T> parentFuture) {
2017    addListener(parentFuture, (res, err) -> {
2018      if (err != null) {
2019        dependentFuture.completeExceptionally(err);
2020      } else {
2021        dependentFuture.complete(res);
2022      }
2023    });
2024  }
2025
2026  @Override
2027  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2028      boolean restoreAcl) {
2029    CompletableFuture<Void> future = new CompletableFuture<>();
2030    addListener(tableExists(tableName), (exists, err) -> {
2031      if (err != null) {
2032        future.completeExceptionally(err);
2033      } else if (exists) {
2034        future.completeExceptionally(new TableExistsException(tableName));
2035      } else {
2036        completeConditionalOnFuture(future,
2037          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2038      }
2039    });
2040    return future;
2041  }
2042
2043  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2044      boolean restoreAcl) {
2045    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2046      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2047    try {
2048      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2049    } catch (IllegalArgumentException e) {
2050      return failedFuture(e);
2051    }
2052    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2053      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2054        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2055          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2056        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2057      .call());
2058  }
2059
2060  @Override
2061  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2062    return getCompletedSnapshots(null);
2063  }
2064
2065  @Override
2066  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2067    Preconditions.checkNotNull(pattern,
2068      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2069    return getCompletedSnapshots(pattern);
2070  }
2071
2072  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2073    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2074        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2075        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2076          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2077          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2078        .call();
2079  }
2080
2081  @Override
2082  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2083    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2084        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2085    return getCompletedSnapshots(tableNamePattern, null);
2086  }
2087
2088  @Override
2089  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2090      Pattern snapshotNamePattern) {
2091    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2092        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2093    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2094        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2095    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2096  }
2097
2098  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2099      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2100    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2101    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2102      if (err != null) {
2103        future.completeExceptionally(err);
2104        return;
2105      }
2106      if (tableNames == null || tableNames.size() <= 0) {
2107        future.complete(Collections.emptyList());
2108        return;
2109      }
2110      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2111        if (err2 != null) {
2112          future.completeExceptionally(err2);
2113          return;
2114        }
2115        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2116          future.complete(Collections.emptyList());
2117          return;
2118        }
2119        future.complete(snapshotDescList.stream()
2120          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2121          .collect(Collectors.toList()));
2122      });
2123    });
2124    return future;
2125  }
2126
2127  @Override
2128  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2129    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2130  }
2131
2132  @Override
2133  public CompletableFuture<Void> deleteSnapshots() {
2134    return internalDeleteSnapshots(null, null);
2135  }
2136
2137  @Override
2138  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2139    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2140        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2141    return internalDeleteSnapshots(null, snapshotNamePattern);
2142  }
2143
2144  @Override
2145  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2146    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2147        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2148    return internalDeleteSnapshots(tableNamePattern, null);
2149  }
2150
2151  @Override
2152  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2153      Pattern snapshotNamePattern) {
2154    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2155        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2156    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2157        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2158    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2159  }
2160
2161  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2162      Pattern snapshotNamePattern) {
2163    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2164    if (tableNamePattern == null) {
2165      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2166    } else {
2167      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2168    }
2169    CompletableFuture<Void> future = new CompletableFuture<>();
2170    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2171      if (err != null) {
2172        future.completeExceptionally(err);
2173        return;
2174      }
2175      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2176        future.complete(null);
2177        return;
2178      }
2179      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2180        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2181          if (e != null) {
2182            future.completeExceptionally(e);
2183          } else {
2184            future.complete(v);
2185          }
2186        });
2187    }));
2188    return future;
2189  }
2190
2191  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2192    return this
2193        .<Void> newMasterCaller()
2194        .action(
2195          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2196            controller,
2197            stub,
2198            DeleteSnapshotRequest.newBuilder()
2199                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2200                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2201  }
2202
2203  @Override
2204  public CompletableFuture<Void> execProcedure(String signature, String instance,
2205      Map<String, String> props) {
2206    CompletableFuture<Void> future = new CompletableFuture<>();
2207    ProcedureDescription procDesc =
2208      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2209    addListener(this.<Long> newMasterCaller()
2210      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2211        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2212        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2213      .call(), (expectedTimeout, err) -> {
2214        if (err != null) {
2215          future.completeExceptionally(err);
2216          return;
2217        }
2218        TimerTask pollingTask = new TimerTask() {
2219          int tries = 0;
2220          long startTime = EnvironmentEdgeManager.currentTime();
2221          long endTime = startTime + expectedTimeout;
2222          long maxPauseTime = expectedTimeout / maxAttempts;
2223
2224          @Override
2225          public void run(Timeout timeout) throws Exception {
2226            if (EnvironmentEdgeManager.currentTime() < endTime) {
2227              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2228                if (err2 != null) {
2229                  future.completeExceptionally(err2);
2230                  return;
2231                }
2232                if (done) {
2233                  future.complete(null);
2234                } else {
2235                  // retry again after pauseTime.
2236                  long pauseTime =
2237                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2238                  pauseTime = Math.min(pauseTime, maxPauseTime);
2239                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2240                    TimeUnit.MICROSECONDS);
2241                }
2242              });
2243            } else {
2244              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2245                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2246            }
2247          }
2248        };
2249        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2250        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2251      });
2252    return future;
2253  }
2254
2255  @Override
2256  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2257      Map<String, String> props) {
2258    ProcedureDescription proDesc =
2259        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2260    return this.<byte[]> newMasterCaller()
2261        .action(
2262          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2263            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2264            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2265            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2266        .call();
2267  }
2268
2269  @Override
2270  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2271      Map<String, String> props) {
2272    ProcedureDescription proDesc =
2273        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2274    return this.<Boolean> newMasterCaller()
2275        .action((controller, stub) -> this
2276            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2277              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2278              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2279        .call();
2280  }
2281
2282  @Override
2283  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2284    return this.<Boolean> newMasterCaller().action(
2285      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2286        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2287        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2288        .call();
2289  }
2290
2291  @Override
2292  public CompletableFuture<String> getProcedures() {
2293    return this
2294        .<String> newMasterCaller()
2295        .action(
2296          (controller, stub) -> this
2297              .<GetProceduresRequest, GetProceduresResponse, String> call(
2298                controller, stub, GetProceduresRequest.newBuilder().build(),
2299                (s, c, req, done) -> s.getProcedures(c, req, done),
2300                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2301  }
2302
2303  @Override
2304  public CompletableFuture<String> getLocks() {
2305    return this
2306        .<String> newMasterCaller()
2307        .action(
2308          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2309            controller, stub, GetLocksRequest.newBuilder().build(),
2310            (s, c, req, done) -> s.getLocks(c, req, done),
2311            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2312  }
2313
2314  @Override
2315  public CompletableFuture<Void> decommissionRegionServers(
2316      List<ServerName> servers, boolean offload) {
2317    return this.<Void> newMasterCaller()
2318        .action((controller, stub) -> this
2319          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2320            controller, stub,
2321              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2322            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2323        .call();
2324  }
2325
2326  @Override
2327  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2328    return this.<List<ServerName>> newMasterCaller()
2329        .action((controller, stub) -> this
2330          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2331            List<ServerName>> call(
2332              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2333              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2334              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2335                  .collect(Collectors.toList())))
2336        .call();
2337  }
2338
2339  @Override
2340  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2341      List<byte[]> encodedRegionNames) {
2342    return this.<Void> newMasterCaller()
2343        .action((controller, stub) ->
2344            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2345                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2346                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2347                        c, req, done), resp -> null)).call();
2348  }
2349
2350  /**
2351   * Get the region location for the passed region name. The region name may be a full region name
2352   * or encoded region name. If the region does not found, then it'll throw an
2353   * UnknownRegionException wrapped by a {@link CompletableFuture}
2354   * @param regionNameOrEncodedRegionName region name or encoded region name
2355   * @return region location, wrapped by a {@link CompletableFuture}
2356   */
2357  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2358    if (regionNameOrEncodedRegionName == null) {
2359      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2360    }
2361    try {
2362      CompletableFuture<Optional<HRegionLocation>> future;
2363      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2364        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2365        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2366          // old format encodedName, should be meta region
2367          future = connection.registry.getMetaRegionLocations()
2368            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2369              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2370        } else {
2371          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2372            regionNameOrEncodedRegionName);
2373        }
2374      } else {
2375        RegionInfo regionInfo =
2376          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2377        if (regionInfo.isMetaRegion()) {
2378          future = connection.registry.getMetaRegionLocations()
2379            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2380              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2381              .findFirst());
2382        } else {
2383          future =
2384            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2385        }
2386      }
2387
2388      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2389      addListener(future, (location, err) -> {
2390        if (err != null) {
2391          returnedFuture.completeExceptionally(err);
2392          return;
2393        }
2394        if (!location.isPresent() || location.get().getRegion() == null) {
2395          returnedFuture.completeExceptionally(
2396            new UnknownRegionException("Invalid region name or encoded region name: " +
2397              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2398        } else {
2399          returnedFuture.complete(location.get());
2400        }
2401      });
2402      return returnedFuture;
2403    } catch (IOException e) {
2404      return failedFuture(e);
2405    }
2406  }
2407
2408  /**
2409   * Get the region info for the passed region name. The region name may be a full region name or
2410   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2411   * wrapped by a {@link CompletableFuture}
2412   * @return region info, wrapped by a {@link CompletableFuture}
2413   */
2414  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2415    if (regionNameOrEncodedRegionName == null) {
2416      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2417    }
2418
2419    if (Bytes.equals(regionNameOrEncodedRegionName,
2420      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2421      Bytes.equals(regionNameOrEncodedRegionName,
2422        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2423      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2424    }
2425
2426    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2427    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2428      if (err != null) {
2429        future.completeExceptionally(err);
2430      } else {
2431        future.complete(location.getRegion());
2432      }
2433    });
2434    return future;
2435  }
2436
2437  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2438    if (numRegions < 3) {
2439      throw new IllegalArgumentException("Must create at least three regions");
2440    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2441      throw new IllegalArgumentException("Start key must be smaller than end key");
2442    }
2443    if (numRegions == 3) {
2444      return new byte[][] { startKey, endKey };
2445    }
2446    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2447    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2448      throw new IllegalArgumentException("Unable to split key range into enough regions");
2449    }
2450    return splitKeys;
2451  }
2452
2453  private void verifySplitKeys(byte[][] splitKeys) {
2454    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2455    // Verify there are no duplicate split keys
2456    byte[] lastKey = null;
2457    for (byte[] splitKey : splitKeys) {
2458      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2459        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2460      }
2461      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2462        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2463            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2464      }
2465      lastKey = splitKey;
2466    }
2467  }
2468
2469  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2470
2471    abstract void onFinished();
2472
2473    abstract void onError(Throwable error);
2474
2475    @Override
2476    public void accept(Void v, Throwable error) {
2477      if (error != null) {
2478        onError(error);
2479        return;
2480      }
2481      onFinished();
2482    }
2483  }
2484
2485  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2486    protected final TableName tableName;
2487
2488    TableProcedureBiConsumer(TableName tableName) {
2489      this.tableName = tableName;
2490    }
2491
2492    abstract String getOperationType();
2493
2494    String getDescription() {
2495      return "Operation: " + getOperationType() + ", " + "Table Name: "
2496          + tableName.getNameWithNamespaceInclAsString();
2497    }
2498
2499    @Override
2500    void onFinished() {
2501      LOG.info(getDescription() + " completed");
2502    }
2503
2504    @Override
2505    void onError(Throwable error) {
2506      LOG.info(getDescription() + " failed with " + error.getMessage());
2507    }
2508  }
2509
2510  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2511    protected final String namespaceName;
2512
2513    NamespaceProcedureBiConsumer(String namespaceName) {
2514      this.namespaceName = namespaceName;
2515    }
2516
2517    abstract String getOperationType();
2518
2519    String getDescription() {
2520      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2521    }
2522
2523    @Override
2524    void onFinished() {
2525      LOG.info(getDescription() + " completed");
2526    }
2527
2528    @Override
2529    void onError(Throwable error) {
2530      LOG.info(getDescription() + " failed with " + error.getMessage());
2531    }
2532  }
2533
2534  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2535
2536    CreateTableProcedureBiConsumer(TableName tableName) {
2537      super(tableName);
2538    }
2539
2540    @Override
2541    String getOperationType() {
2542      return "CREATE";
2543    }
2544  }
2545
2546  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2547
2548    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2549      super(tableName);
2550    }
2551
2552    @Override
2553    String getOperationType() {
2554      return "ENABLE";
2555    }
2556  }
2557
2558  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2559
2560    DeleteTableProcedureBiConsumer(TableName tableName) {
2561      super(tableName);
2562    }
2563
2564    @Override
2565    String getOperationType() {
2566      return "DELETE";
2567    }
2568
2569    @Override
2570    void onFinished() {
2571      connection.getLocator().clearCache(this.tableName);
2572      super.onFinished();
2573    }
2574  }
2575
2576  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2577
2578    TruncateTableProcedureBiConsumer(TableName tableName) {
2579      super(tableName);
2580    }
2581
2582    @Override
2583    String getOperationType() {
2584      return "TRUNCATE";
2585    }
2586  }
2587
2588  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2589
2590    EnableTableProcedureBiConsumer(TableName tableName) {
2591      super(tableName);
2592    }
2593
2594    @Override
2595    String getOperationType() {
2596      return "ENABLE";
2597    }
2598  }
2599
2600  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2601
2602    DisableTableProcedureBiConsumer(TableName tableName) {
2603      super(tableName);
2604    }
2605
2606    @Override
2607    String getOperationType() {
2608      return "DISABLE";
2609    }
2610  }
2611
2612  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2613
2614    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2615      super(tableName);
2616    }
2617
2618    @Override
2619    String getOperationType() {
2620      return "ADD_COLUMN_FAMILY";
2621    }
2622  }
2623
2624  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2625
2626    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2627      super(tableName);
2628    }
2629
2630    @Override
2631    String getOperationType() {
2632      return "DELETE_COLUMN_FAMILY";
2633    }
2634  }
2635
2636  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2637
2638    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2639      super(tableName);
2640    }
2641
2642    @Override
2643    String getOperationType() {
2644      return "MODIFY_COLUMN_FAMILY";
2645    }
2646  }
2647
2648  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2649
2650    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2651      super(namespaceName);
2652    }
2653
2654    @Override
2655    String getOperationType() {
2656      return "CREATE_NAMESPACE";
2657    }
2658  }
2659
2660  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2661
2662    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2663      super(namespaceName);
2664    }
2665
2666    @Override
2667    String getOperationType() {
2668      return "DELETE_NAMESPACE";
2669    }
2670  }
2671
2672  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2673
2674    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2675      super(namespaceName);
2676    }
2677
2678    @Override
2679    String getOperationType() {
2680      return "MODIFY_NAMESPACE";
2681    }
2682  }
2683
2684  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2685
2686    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2687      super(tableName);
2688    }
2689
2690    @Override
2691    String getOperationType() {
2692      return "MERGE_REGIONS";
2693    }
2694  }
2695
2696  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2697
2698    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2699      super(tableName);
2700    }
2701
2702    @Override
2703    String getOperationType() {
2704      return "SPLIT_REGION";
2705    }
2706  }
2707
2708  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2709    private final String peerId;
2710    private final Supplier<String> getOperation;
2711
2712    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2713      this.peerId = peerId;
2714      this.getOperation = getOperation;
2715    }
2716
2717    String getDescription() {
2718      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2719    }
2720
2721    @Override
2722    void onFinished() {
2723      LOG.info(getDescription() + " completed");
2724    }
2725
2726    @Override
2727    void onError(Throwable error) {
2728      LOG.info(getDescription() + " failed with " + error.getMessage());
2729    }
2730  }
2731
2732  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2733    CompletableFuture<Void> future = new CompletableFuture<>();
2734    addListener(procFuture, (procId, error) -> {
2735      if (error != null) {
2736        future.completeExceptionally(error);
2737        return;
2738      }
2739      getProcedureResult(procId, future, 0);
2740    });
2741    return future;
2742  }
2743
2744  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2745    addListener(
2746      this.<GetProcedureResultResponse> newMasterCaller()
2747        .action((controller, stub) -> this
2748          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2749            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2750            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2751        .call(),
2752      (response, error) -> {
2753        if (error != null) {
2754          LOG.warn("failed to get the procedure result procId={}", procId,
2755            ConnectionUtils.translateException(error));
2756          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2757            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2758          return;
2759        }
2760        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2761          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2762            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2763          return;
2764        }
2765        if (response.hasException()) {
2766          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2767          future.completeExceptionally(ioe);
2768        } else {
2769          future.complete(null);
2770        }
2771      });
2772  }
2773
2774  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2775    CompletableFuture<T> future = new CompletableFuture<>();
2776    future.completeExceptionally(error);
2777    return future;
2778  }
2779
2780  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2781    if (error != null) {
2782      future.completeExceptionally(error);
2783      return true;
2784    }
2785    return false;
2786  }
2787
2788  @Override
2789  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2790    return getClusterMetrics(EnumSet.allOf(Option.class));
2791  }
2792
2793  @Override
2794  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2795    return this
2796        .<ClusterMetrics> newMasterCaller()
2797        .action(
2798          (controller, stub) -> this
2799              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2800                stub, RequestConverter.buildGetClusterStatusRequest(options),
2801                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2802                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2803  }
2804
2805  @Override
2806  public CompletableFuture<Void> shutdown() {
2807    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2808      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2809        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2810        resp -> null))
2811      .call();
2812  }
2813
2814  @Override
2815  public CompletableFuture<Void> stopMaster() {
2816    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2817      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2818        controller, stub, StopMasterRequest.newBuilder().build(),
2819        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2820      .call();
2821  }
2822
2823  @Override
2824  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2825    StopServerRequest request = RequestConverter
2826      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2827    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2828      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2829        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2830        resp -> null))
2831      .serverName(serverName).call();
2832  }
2833
2834  @Override
2835  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2836    return this
2837        .<Void> newAdminCaller()
2838        .action(
2839          (controller, stub) -> this
2840              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2841                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2842                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2843        .serverName(serverName).call();
2844  }
2845
2846  @Override
2847  public CompletableFuture<Void> updateConfiguration() {
2848    CompletableFuture<Void> future = new CompletableFuture<Void>();
2849    addListener(
2850      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2851      (status, err) -> {
2852        if (err != null) {
2853          future.completeExceptionally(err);
2854        } else {
2855          List<CompletableFuture<Void>> futures = new ArrayList<>();
2856          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2857          futures.add(updateConfiguration(status.getMasterName()));
2858          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2859          addListener(
2860            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2861            (result, err2) -> {
2862              if (err2 != null) {
2863                future.completeExceptionally(err2);
2864              } else {
2865                future.complete(result);
2866              }
2867            });
2868        }
2869      });
2870    return future;
2871  }
2872
2873  @Override
2874  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2875    return this
2876        .<Void> newAdminCaller()
2877        .action(
2878          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2879            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2880            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2881        .serverName(serverName).call();
2882  }
2883
2884  @Override
2885  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2886    return this
2887        .<Void> newAdminCaller()
2888        .action(
2889          (controller, stub) -> this
2890              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2891                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2892                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2893        .serverName(serverName).call();
2894  }
2895
2896  @Override
2897  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2898    return this
2899        .<List<SecurityCapability>> newMasterCaller()
2900        .action(
2901          (controller, stub) -> this
2902              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2903                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2904                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2905                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2906        .call();
2907  }
2908
2909  @Override
2910  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2911    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2912  }
2913
2914  @Override
2915  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2916      TableName tableName) {
2917    Preconditions.checkNotNull(tableName,
2918      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2919    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2920  }
2921
2922  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2923      ServerName serverName) {
2924    return this.<List<RegionMetrics>> newAdminCaller()
2925        .action((controller, stub) -> this
2926            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2927              adminCall(controller, stub, request, (s, c, req, done) ->
2928                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2929        .serverName(serverName).call();
2930  }
2931
2932  @Override
2933  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2934    return this
2935        .<Boolean> newMasterCaller()
2936        .action(
2937          (controller, stub) -> this
2938              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2939                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2940                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2941                resp -> resp.getInMaintenanceMode())).call();
2942  }
2943
2944  @Override
2945  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2946      CompactType compactType) {
2947    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2948
2949    switch (compactType) {
2950      case MOB:
2951        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
2952          if (err != null) {
2953            future.completeExceptionally(err);
2954            return;
2955          }
2956          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2957
2958          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2959            .action((controller, stub) -> this
2960              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2961                controller, stub,
2962                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2963                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2964            .call(), (resp2, err2) -> {
2965              if (err2 != null) {
2966                future.completeExceptionally(err2);
2967              } else {
2968                if (resp2.hasCompactionState()) {
2969                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2970                } else {
2971                  future.complete(CompactionState.NONE);
2972                }
2973              }
2974            });
2975        });
2976        break;
2977      case NORMAL:
2978        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2979          if (err != null) {
2980            future.completeExceptionally(err);
2981            return;
2982          }
2983          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2984          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2985          locations.stream().filter(loc -> loc.getServerName() != null)
2986            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2987            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2988              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2989                // If any region compaction state is MAJOR_AND_MINOR
2990                // the table compaction state is MAJOR_AND_MINOR, too.
2991                if (err2 != null) {
2992                  future.completeExceptionally(unwrapCompletionException(err2));
2993                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2994                  future.complete(regionState);
2995                } else {
2996                  regionStates.add(regionState);
2997                }
2998              }));
2999            });
3000          addListener(
3001            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3002            (ret, err3) -> {
3003              // If future not completed, check all regions's compaction state
3004              if (!future.isCompletedExceptionally() && !future.isDone()) {
3005                CompactionState state = CompactionState.NONE;
3006                for (CompactionState regionState : regionStates) {
3007                  switch (regionState) {
3008                    case MAJOR:
3009                      if (state == CompactionState.MINOR) {
3010                        future.complete(CompactionState.MAJOR_AND_MINOR);
3011                      } else {
3012                        state = CompactionState.MAJOR;
3013                      }
3014                      break;
3015                    case MINOR:
3016                      if (state == CompactionState.MAJOR) {
3017                        future.complete(CompactionState.MAJOR_AND_MINOR);
3018                      } else {
3019                        state = CompactionState.MINOR;
3020                      }
3021                      break;
3022                    case NONE:
3023                    default:
3024                  }
3025                }
3026                if (!future.isDone()) {
3027                  future.complete(state);
3028                }
3029              }
3030            });
3031        });
3032        break;
3033      default:
3034        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3035    }
3036
3037    return future;
3038  }
3039
3040  @Override
3041  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3042    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3043    addListener(getRegionLocation(regionName), (location, err) -> {
3044      if (err != null) {
3045        future.completeExceptionally(err);
3046        return;
3047      }
3048      ServerName serverName = location.getServerName();
3049      if (serverName == null) {
3050        future
3051          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3052        return;
3053      }
3054      addListener(
3055        this.<GetRegionInfoResponse> newAdminCaller()
3056          .action((controller, stub) -> this
3057            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3058              controller, stub,
3059              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3060                true),
3061              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3062          .serverName(serverName).call(),
3063        (resp2, err2) -> {
3064          if (err2 != null) {
3065            future.completeExceptionally(err2);
3066          } else {
3067            if (resp2.hasCompactionState()) {
3068              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3069            } else {
3070              future.complete(CompactionState.NONE);
3071            }
3072          }
3073        });
3074    });
3075    return future;
3076  }
3077
3078  @Override
3079  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3080    MajorCompactionTimestampRequest request =
3081        MajorCompactionTimestampRequest.newBuilder()
3082            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3083    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3084        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3085            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3086                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3087  }
3088
3089  @Override
3090  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3091      byte[] regionName) {
3092    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3093    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3094    addListener(getRegionInfo(regionName), (region, err) -> {
3095      if (err != null) {
3096        future.completeExceptionally(err);
3097        return;
3098      }
3099      MajorCompactionTimestampForRegionRequest.Builder builder =
3100        MajorCompactionTimestampForRegionRequest.newBuilder();
3101      builder.setRegion(
3102        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3103      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3104        .<MajorCompactionTimestampForRegionRequest,
3105        MajorCompactionTimestampResponse, Optional<Long>> call(
3106          controller, stub, builder.build(),
3107          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3108          ProtobufUtil::toOptionalTimestamp))
3109        .call(), (timestamp, err2) -> {
3110          if (err2 != null) {
3111            future.completeExceptionally(err2);
3112          } else {
3113            future.complete(timestamp);
3114          }
3115        });
3116    });
3117    return future;
3118  }
3119
3120  @Override
3121  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3122      List<String> serverNamesList) {
3123    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3124    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3125      if (err != null) {
3126        future.completeExceptionally(err);
3127        return;
3128      }
3129      // Accessed by multiple threads.
3130      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3131      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3132      serverNames.stream().forEach(serverName -> {
3133        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3134          if (err2 != null) {
3135            future.completeExceptionally(unwrapCompletionException(err2));
3136          } else {
3137            serverStates.put(serverName, serverState);
3138          }
3139        }));
3140      });
3141      addListener(
3142        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3143        (ret, err3) -> {
3144          if (!future.isCompletedExceptionally()) {
3145            if (err3 != null) {
3146              future.completeExceptionally(err3);
3147            } else {
3148              future.complete(serverStates);
3149            }
3150          }
3151        });
3152    });
3153    return future;
3154  }
3155
3156  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3157    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3158    if (serverNamesList.isEmpty()) {
3159      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3160        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3161      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3162        if (err != null) {
3163          future.completeExceptionally(err);
3164        } else {
3165          future.complete(clusterMetrics.getServersName());
3166        }
3167      });
3168      return future;
3169    } else {
3170      List<ServerName> serverList = new ArrayList<>();
3171      for (String regionServerName : serverNamesList) {
3172        ServerName serverName = null;
3173        try {
3174          serverName = ServerName.valueOf(regionServerName);
3175        } catch (Exception e) {
3176          future.completeExceptionally(
3177            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3178        }
3179        if (serverName == null) {
3180          future.completeExceptionally(
3181            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3182        } else {
3183          serverList.add(serverName);
3184        }
3185      }
3186      future.complete(serverList);
3187    }
3188    return future;
3189  }
3190
3191  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3192    return this
3193        .<Boolean>newAdminCaller()
3194        .serverName(serverName)
3195        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3196            Boolean>adminCall(controller, stub,
3197            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3198            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3199  }
3200
3201  @Override
3202  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3203    return this.<Boolean> newMasterCaller()
3204      .action((controller, stub) -> this
3205        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3206          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3207          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3208          (resp) -> resp.getPrevBalanceValue()))
3209      .call();
3210  }
3211
3212  @Override
3213  public CompletableFuture<Boolean> balance(boolean forcible) {
3214    return this
3215        .<Boolean> newMasterCaller()
3216        .action(
3217          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3218            stub, RequestConverter.buildBalanceRequest(forcible),
3219            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3220  }
3221
3222  @Override
3223  public CompletableFuture<Boolean> isBalancerEnabled() {
3224    return this
3225        .<Boolean> newMasterCaller()
3226        .action((controller, stub) ->
3227              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3228            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3229                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3230  }
3231
3232  @Override
3233  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3234    return this
3235        .<Boolean> newMasterCaller()
3236        .action(
3237          (controller, stub) -> this
3238              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3239                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3240                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3241                    .getPrevNormalizerValue())).call();
3242  }
3243
3244  @Override
3245  public CompletableFuture<Boolean> isNormalizerEnabled() {
3246    return this
3247        .<Boolean> newMasterCaller()
3248        .action(
3249          (controller, stub) -> this
3250              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3251                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3252                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3253                (resp) -> resp.getEnabled())).call();
3254  }
3255
3256  @Override
3257  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3258    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3259  }
3260
3261  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3262    return this
3263      .<Boolean> newMasterCaller()
3264      .action(
3265        (controller, stub) -> this.call(
3266          controller, stub, request, MasterService.Interface::normalize,
3267          NormalizeResponse::getNormalizerRan))
3268      .call();
3269  }
3270
3271  @Override
3272  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3273    return this
3274        .<Boolean> newMasterCaller()
3275        .action(
3276          (controller, stub) -> this
3277              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3278                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3279                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3280                    .getPrevValue())).call();
3281  }
3282
3283  @Override
3284  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3285    return this
3286        .<Boolean> newMasterCaller()
3287        .action(
3288          (controller, stub) -> this
3289              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3290                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3291                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3292        .call();
3293  }
3294
3295  @Override
3296  public CompletableFuture<Boolean> runCleanerChore() {
3297    return this
3298        .<Boolean> newMasterCaller()
3299        .action(
3300          (controller, stub) -> this
3301              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3302                RequestConverter.buildRunCleanerChoreRequest(),
3303                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3304                (resp) -> resp.getCleanerChoreRan())).call();
3305  }
3306
3307  @Override
3308  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3309    return this
3310        .<Boolean> newMasterCaller()
3311        .action(
3312          (controller, stub) -> this
3313              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3314                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3315                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3316                    .getPrevValue())).call();
3317  }
3318
3319  @Override
3320  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3321    return this
3322        .<Boolean> newMasterCaller()
3323        .action(
3324          (controller, stub) -> this
3325              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3326                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3327                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3328                    .getValue())).call();
3329  }
3330
3331  @Override
3332  public CompletableFuture<Integer> runCatalogJanitor() {
3333    return this
3334        .<Integer> newMasterCaller()
3335        .action(
3336          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3337            controller, stub, RequestConverter.buildCatalogScanRequest(),
3338            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3339        .call();
3340  }
3341
3342  @Override
3343  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3344      ServiceCaller<S, R> callable) {
3345    MasterCoprocessorRpcChannelImpl channel =
3346        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3347    S stub = stubMaker.apply(channel);
3348    CompletableFuture<R> future = new CompletableFuture<>();
3349    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3350    callable.call(stub, controller, resp -> {
3351      if (controller.failed()) {
3352        future.completeExceptionally(controller.getFailed());
3353      } else {
3354        future.complete(resp);
3355      }
3356    });
3357    return future;
3358  }
3359
3360  @Override
3361  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3362      ServiceCaller<S, R> callable, ServerName serverName) {
3363    RegionServerCoprocessorRpcChannelImpl channel =
3364        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3365          serverName));
3366    S stub = stubMaker.apply(channel);
3367    CompletableFuture<R> future = new CompletableFuture<>();
3368    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3369    callable.call(stub, controller, resp -> {
3370      if (controller.failed()) {
3371        future.completeExceptionally(controller.getFailed());
3372      } else {
3373        future.complete(resp);
3374      }
3375    });
3376    return future;
3377  }
3378
3379  @Override
3380  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3381    return this.<List<ServerName>> newMasterCaller()
3382      .action((controller, stub) -> this
3383        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3384          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3385          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3386          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3387      .call();
3388  }
3389
3390  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3391    return this.connection.callerFactory.<T> serverRequest()
3392      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3393      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3394      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3395      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3396  }
3397
3398  @Override
3399  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3400    if (tableName == null) {
3401      return failedFuture(new IllegalArgumentException("Table name is null"));
3402    }
3403    CompletableFuture<Void> future = new CompletableFuture<>();
3404    addListener(tableExists(tableName), (exist, err) -> {
3405      if (err != null) {
3406        future.completeExceptionally(err);
3407        return;
3408      }
3409      if (!exist) {
3410        future.completeExceptionally(new TableNotFoundException(
3411          "Table '" + tableName.getNameAsString() + "' does not exists."));
3412        return;
3413      }
3414      addListener(getTableSplits(tableName), (splits, err1) -> {
3415        if (err1 != null) {
3416          future.completeExceptionally(err1);
3417        } else {
3418          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3419            if (err2 != null) {
3420              future.completeExceptionally(err2);
3421            } else {
3422              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3423                if (err3 != null) {
3424                  future.completeExceptionally(err3);
3425                } else {
3426                  future.complete(result3);
3427                }
3428              });
3429            }
3430          });
3431        }
3432      });
3433    });
3434    return future;
3435  }
3436
3437  @Override
3438  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3439    if (tableName == null) {
3440      return failedFuture(new IllegalArgumentException("Table name is null"));
3441    }
3442    CompletableFuture<Void> future = new CompletableFuture<>();
3443    addListener(tableExists(tableName), (exist, err) -> {
3444      if (err != null) {
3445        future.completeExceptionally(err);
3446        return;
3447      }
3448      if (!exist) {
3449        future.completeExceptionally(new TableNotFoundException(
3450          "Table '" + tableName.getNameAsString() + "' does not exists."));
3451        return;
3452      }
3453      addListener(setTableReplication(tableName, false), (result, err2) -> {
3454        if (err2 != null) {
3455          future.completeExceptionally(err2);
3456        } else {
3457          future.complete(result);
3458        }
3459      });
3460    });
3461    return future;
3462  }
3463
3464  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3465    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3466    addListener(
3467      getRegions(tableName).thenApply(regions -> regions.stream()
3468        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3469      (regions, err2) -> {
3470        if (err2 != null) {
3471          future.completeExceptionally(err2);
3472          return;
3473        }
3474        if (regions.size() == 1) {
3475          future.complete(null);
3476        } else {
3477          byte[][] splits = new byte[regions.size() - 1][];
3478          for (int i = 1; i < regions.size(); i++) {
3479            splits[i - 1] = regions.get(i).getStartKey();
3480          }
3481          future.complete(splits);
3482        }
3483      });
3484    return future;
3485  }
3486
3487  /**
3488   * Connect to peer and check the table descriptor on peer:
3489   * <ol>
3490   * <li>Create the same table on peer when not exist.</li>
3491   * <li>Throw an exception if the table already has replication enabled on any of the column
3492   * families.</li>
3493   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3494   * </ol>
3495   * @param tableName name of the table to sync to the peer
3496   * @param splits table split keys
3497   */
3498  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3499      byte[][] splits) {
3500    CompletableFuture<Void> future = new CompletableFuture<>();
3501    addListener(listReplicationPeers(), (peers, err) -> {
3502      if (err != null) {
3503        future.completeExceptionally(err);
3504        return;
3505      }
3506      if (peers == null || peers.size() <= 0) {
3507        future.completeExceptionally(
3508          new IllegalArgumentException("Found no peer cluster for replication."));
3509        return;
3510      }
3511      List<CompletableFuture<Void>> futures = new ArrayList<>();
3512      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3513        .forEach(peer -> {
3514          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3515        });
3516      addListener(
3517        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3518        (result, err2) -> {
3519          if (err2 != null) {
3520            future.completeExceptionally(err2);
3521          } else {
3522            future.complete(result);
3523          }
3524        });
3525    });
3526    return future;
3527  }
3528
3529  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3530      ReplicationPeerDescription peer) {
3531    Configuration peerConf = null;
3532    try {
3533      peerConf =
3534        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3535    } catch (IOException e) {
3536      return failedFuture(e);
3537    }
3538    CompletableFuture<Void> future = new CompletableFuture<>();
3539    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3540      if (err != null) {
3541        future.completeExceptionally(err);
3542        return;
3543      }
3544      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3545        if (err1 != null) {
3546          future.completeExceptionally(err1);
3547          return;
3548        }
3549        AsyncAdmin peerAdmin = conn.getAdmin();
3550        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3551          if (err2 != null) {
3552            future.completeExceptionally(err2);
3553            return;
3554          }
3555          if (!exist) {
3556            CompletableFuture<Void> createTableFuture = null;
3557            if (splits == null) {
3558              createTableFuture = peerAdmin.createTable(tableDesc);
3559            } else {
3560              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3561            }
3562            addListener(createTableFuture, (result, err3) -> {
3563              if (err3 != null) {
3564                future.completeExceptionally(err3);
3565              } else {
3566                future.complete(result);
3567              }
3568            });
3569          } else {
3570            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3571              (result, err4) -> {
3572                if (err4 != null) {
3573                  future.completeExceptionally(err4);
3574                } else {
3575                  future.complete(result);
3576                }
3577              });
3578          }
3579        });
3580      });
3581    });
3582    return future;
3583  }
3584
3585  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3586      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3587    CompletableFuture<Void> future = new CompletableFuture<>();
3588    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3589      if (err != null) {
3590        future.completeExceptionally(err);
3591        return;
3592      }
3593      if (peerTableDesc == null) {
3594        future.completeExceptionally(
3595          new IllegalArgumentException("Failed to get table descriptor for table " +
3596            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3597        return;
3598      }
3599      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3600        future.completeExceptionally(new IllegalArgumentException(
3601          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3602            ", but the table descriptors are not same when compared with source cluster." +
3603            " Thus can not enable the table's replication switch."));
3604        return;
3605      }
3606      future.complete(null);
3607    });
3608    return future;
3609  }
3610
3611  /**
3612   * Set the table's replication switch if the table's replication switch is already not set.
3613   * @param tableName name of the table
3614   * @param enableRep is replication switch enable or disable
3615   */
3616  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3617    CompletableFuture<Void> future = new CompletableFuture<>();
3618    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3619      if (err != null) {
3620        future.completeExceptionally(err);
3621        return;
3622      }
3623      if (!tableDesc.matchReplicationScope(enableRep)) {
3624        int scope =
3625          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3626        TableDescriptor newTableDesc =
3627          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3628        addListener(modifyTable(newTableDesc), (result, err2) -> {
3629          if (err2 != null) {
3630            future.completeExceptionally(err2);
3631          } else {
3632            future.complete(result);
3633          }
3634        });
3635      } else {
3636        future.complete(null);
3637      }
3638    });
3639    return future;
3640  }
3641
3642  @Override
3643  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3644    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3645    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3646      if (err != null) {
3647        future.completeExceptionally(err);
3648        return;
3649      }
3650      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3651        locations.stream().filter(l -> l.getRegion() != null)
3652          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3653          .collect(Collectors.groupingBy(l -> l.getServerName(),
3654            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3655      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3656      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3657      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3658        futures
3659          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3660            if (err2 != null) {
3661              future.completeExceptionally(unwrapCompletionException(err2));
3662            } else {
3663              aggregator.append(stats);
3664            }
3665          }));
3666      }
3667      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3668        (ret, err3) -> {
3669          if (err3 != null) {
3670            future.completeExceptionally(unwrapCompletionException(err3));
3671          } else {
3672            future.complete(aggregator.sum());
3673          }
3674        });
3675    });
3676    return future;
3677  }
3678
3679  @Override
3680  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3681      boolean preserveSplits) {
3682    CompletableFuture<Void> future = new CompletableFuture<>();
3683    addListener(tableExists(tableName), (exist, err) -> {
3684      if (err != null) {
3685        future.completeExceptionally(err);
3686        return;
3687      }
3688      if (!exist) {
3689        future.completeExceptionally(new TableNotFoundException(tableName));
3690        return;
3691      }
3692      addListener(tableExists(newTableName), (exist1, err1) -> {
3693        if (err1 != null) {
3694          future.completeExceptionally(err1);
3695          return;
3696        }
3697        if (exist1) {
3698          future.completeExceptionally(new TableExistsException(newTableName));
3699          return;
3700        }
3701        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3702          if (err2 != null) {
3703            future.completeExceptionally(err2);
3704            return;
3705          }
3706          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3707          if (preserveSplits) {
3708            addListener(getTableSplits(tableName), (splits, err3) -> {
3709              if (err3 != null) {
3710                future.completeExceptionally(err3);
3711              } else {
3712                addListener(
3713                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3714                  (result, err4) -> {
3715                    if (err4 != null) {
3716                      future.completeExceptionally(err4);
3717                    } else {
3718                      future.complete(result);
3719                    }
3720                  });
3721              }
3722            });
3723          } else {
3724            addListener(createTable(newTableDesc), (result, err5) -> {
3725              if (err5 != null) {
3726                future.completeExceptionally(err5);
3727              } else {
3728                future.complete(result);
3729              }
3730            });
3731          }
3732        });
3733      });
3734    });
3735    return future;
3736  }
3737
3738  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3739      List<RegionInfo> hris) {
3740    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3741      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3742        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3743        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3744        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3745      .serverName(serverName).call();
3746  }
3747
3748  @Override
3749  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3750    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3751        .action((controller, stub) -> this
3752            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3753              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3754              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3755              resp -> resp.getPreviousRpcThrottleEnabled()))
3756        .call();
3757    return future;
3758  }
3759
3760  @Override
3761  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3762    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3763        .action((controller, stub) -> this
3764            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3765              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3766              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3767              resp -> resp.getRpcThrottleEnabled()))
3768        .call();
3769    return future;
3770  }
3771
3772  @Override
3773  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3774    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3775        .action((controller, stub) -> this
3776            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3777              controller, stub,
3778              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3779                  .build(),
3780              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3781              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3782        .call();
3783    return future;
3784  }
3785
3786  @Override
3787  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3788    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3789      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3790      Map<TableName, Long>> call(controller, stub,
3791        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3792        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3793        resp -> resp.getSizesList().stream().collect(Collectors
3794          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3795      .call();
3796  }
3797
3798  @Override
3799  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3800      ServerName serverName) {
3801    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3802      .action((controller, stub) -> this
3803        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3804        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3805          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3806          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3807          resp -> resp.getSnapshotsList().stream()
3808            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3809              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3810      .serverName(serverName).call();
3811  }
3812
3813  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3814      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3815    return this.<SpaceQuotaSnapshot> newMasterCaller()
3816      .action((controller, stub) -> this
3817        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3818          RequestConverter.buildGetQuotaStatesRequest(),
3819          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3820      .call();
3821  }
3822
3823  @Override
3824  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3825    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3826      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3827      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3828  }
3829
3830  @Override
3831  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3832    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3833    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3834      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3835      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3836  }
3837
3838  @Override
3839  public CompletableFuture<Void> grant(UserPermission userPermission,
3840      boolean mergeExistingPermissions) {
3841    return this.<Void> newMasterCaller()
3842        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3843          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3844          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3845        .call();
3846  }
3847
3848  @Override
3849  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3850    return this.<Void> newMasterCaller()
3851        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3852          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3853          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3854        .call();
3855  }
3856
3857  @Override
3858  public CompletableFuture<List<UserPermission>>
3859      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3860    return this.<List<UserPermission>> newMasterCaller().action((controller,
3861        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3862            List<UserPermission>> call(controller, stub,
3863              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3864              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3865              resp -> resp.getUserPermissionList().stream()
3866                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3867                .collect(Collectors.toList())))
3868        .call();
3869  }
3870
3871  @Override
3872  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3873      List<Permission> permissions) {
3874    return this.<List<Boolean>> newMasterCaller()
3875        .action((controller, stub) -> this
3876            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3877              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3878              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3879              resp -> resp.getHasUserPermissionList()))
3880        .call();
3881  }
3882
3883  @Override
3884  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3885      final boolean sync) {
3886    return this.<Boolean>newMasterCaller()
3887        .action((controller, stub) -> this
3888            .call(controller, stub,
3889                RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3890                MasterService.Interface::switchSnapshotCleanup,
3891                SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
3892        .call();
3893  }
3894
3895  @Override
3896  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3897    return this.<Boolean>newMasterCaller()
3898        .action((controller, stub) -> this
3899            .call(controller, stub,
3900                RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3901                MasterService.Interface::isSnapshotCleanupEnabled,
3902                IsSnapshotCleanupEnabledResponse::getEnabled))
3903        .call();
3904  }
3905
3906  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
3907      final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
3908      final String logType) {
3909    if (CollectionUtils.isEmpty(serverNames)) {
3910      return CompletableFuture.completedFuture(Collections.emptyList());
3911    }
3912    return CompletableFuture.supplyAsync(() -> serverNames.stream()
3913      .map((ServerName serverName) ->
3914        getSlowLogResponseFromServer(serverName, filterParams, limit, logType))
3915      .map(CompletableFuture::join)
3916      .flatMap(List::stream)
3917      .collect(Collectors.toList()));
3918  }
3919
3920  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
3921      Map<String, Object> filterParams, int limit, String logType) {
3922    return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this
3923      .adminCall(controller, stub,
3924        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
3925        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
3926      .serverName(serverName).call();
3927  }
3928
3929  @Override
3930  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
3931      @Nullable Set<ServerName> serverNames) {
3932    if (CollectionUtils.isEmpty(serverNames)) {
3933      return CompletableFuture.completedFuture(Collections.emptyList());
3934    }
3935    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
3936      .map(this::clearSlowLogsResponses)
3937      .collect(Collectors.toList());
3938    return convertToFutureOfList(clearSlowLogResponseList);
3939  }
3940
3941  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
3942    return this.<Boolean>newAdminCaller()
3943      .action(((controller, stub) -> this
3944        .adminCall(
3945          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
3946          AdminService.Interface::clearSlowLogsResponses,
3947          ProtobufUtil::toClearSlowLogPayload))
3948      ).serverName(serverName).call();
3949  }
3950
3951  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
3952    List<CompletableFuture<T>> futures) {
3953    CompletableFuture<Void> allDoneFuture =
3954      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
3955    return allDoneFuture.thenApply(v ->
3956      futures.stream()
3957        .map(CompletableFuture::join)
3958        .collect(Collectors.toList())
3959    );
3960  }
3961
3962  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
3963    return this.<List<LogEntry>>newMasterCaller()
3964      .action((controller, stub) ->
3965        this.call(controller, stub,
3966          ProtobufUtil.toBalancerDecisionRequest(limit),
3967          MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse))
3968      .call();
3969  }
3970
3971  @Override
3972  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
3973      String logType, ServerType serverType, int limit,
3974      Map<String, Object> filterParams) {
3975    if (logType == null || serverType == null) {
3976      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
3977    }
3978    if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
3979      if (ServerType.MASTER.equals(serverType)) {
3980        throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
3981      }
3982      return getSlowLogResponses(filterParams, serverNames, limit, logType);
3983    } else if (logType.equals("BALANCER_DECISION")) {
3984      if (ServerType.REGION_SERVER.equals(serverType)) {
3985        throw new IllegalArgumentException(
3986          "Balancer Decision logs are not maintained by HRegionServer");
3987      }
3988      return getBalancerDecisions(limit);
3989    }
3990    return CompletableFuture.completedFuture(Collections.emptyList());
3991  }
3992}