001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024import com.google.protobuf.Message;
025import com.google.protobuf.RpcChannel;
026import edu.umd.cs.findbugs.annotations.Nullable;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.yetus.audience.InterfaceAudience;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
102import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
103import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
104import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
105import org.apache.hbase.thirdparty.io.netty.util.Timeout;
106import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
107import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
108
109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
110import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
209    .IsSnapshotCleanupEnabledResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
261    .SetSnapshotCleanupResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
302
303/**
304 * The implementation of AsyncAdmin.
305 * <p>
306 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
307 * be finished inside the rpc framework thread, which means that the callbacks registered to the
308 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
309 * this class should not try to do time consuming tasks in the callbacks.
310 * @since 2.0.0
311 * @see AsyncHBaseAdmin
312 * @see AsyncConnection#getAdmin()
313 * @see AsyncConnection#getAdminBuilder()
314 */
315@InterfaceAudience.Private
316class RawAsyncHBaseAdmin implements AsyncAdmin {
317  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
318
319  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
320
321  private final AsyncConnectionImpl connection;
322
323  private final HashedWheelTimer retryTimer;
324
325  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
326
327  private final long rpcTimeoutNs;
328
329  private final long operationTimeoutNs;
330
331  private final long pauseNs;
332
333  private final long pauseForCQTBENs;
334
335  private final int maxAttempts;
336
337  private final int startLogErrorsCnt;
338
339  private final NonceGenerator ng;
340
341  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
342      AsyncAdminBuilderBase builder) {
343    this.connection = connection;
344    this.retryTimer = retryTimer;
345    this.metaTable = connection.getTable(META_TABLE_NAME);
346    this.rpcTimeoutNs = builder.rpcTimeoutNs;
347    this.operationTimeoutNs = builder.operationTimeoutNs;
348    this.pauseNs = builder.pauseNs;
349    if (builder.pauseForCQTBENs < builder.pauseNs) {
350      LOG.warn(
351        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
352          " the normal pause value {} ms, use the greater one instead",
353        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
354        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
355      this.pauseForCQTBENs = builder.pauseNs;
356    } else {
357      this.pauseForCQTBENs = builder.pauseForCQTBENs;
358    }
359    this.maxAttempts = builder.maxAttempts;
360    this.startLogErrorsCnt = builder.startLogErrorsCnt;
361    this.ng = connection.getNonceGenerator();
362  }
363
364  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
365    return this.connection.callerFactory.<T> masterRequest()
366      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
367      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
368      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
369      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
370  }
371
372  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
373    return this.connection.callerFactory.<T> adminRequest()
374      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
375      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
376      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
377      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
378  }
379
380  @FunctionalInterface
381  private interface MasterRpcCall<RESP, REQ> {
382    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
383        RpcCallback<RESP> done);
384  }
385
386  @FunctionalInterface
387  private interface AdminRpcCall<RESP, REQ> {
388    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
389        RpcCallback<RESP> done);
390  }
391
392  @FunctionalInterface
393  private interface Converter<D, S> {
394    D convert(S src) throws IOException;
395  }
396
397  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
398      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
399      Converter<RESP, PRESP> respConverter) {
400    CompletableFuture<RESP> future = new CompletableFuture<>();
401    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
402
403      @Override
404      public void run(PRESP resp) {
405        if (controller.failed()) {
406          future.completeExceptionally(controller.getFailed());
407        } else {
408          try {
409            future.complete(respConverter.convert(resp));
410          } catch (IOException e) {
411            future.completeExceptionally(e);
412          }
413        }
414      }
415    });
416    return future;
417  }
418
419  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
420      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
421      Converter<RESP, PRESP> respConverter) {
422    CompletableFuture<RESP> future = new CompletableFuture<>();
423    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
424
425      @Override
426      public void run(PRESP resp) {
427        if (controller.failed()) {
428          future.completeExceptionally(controller.getFailed());
429        } else {
430          try {
431            future.complete(respConverter.convert(resp));
432          } catch (IOException e) {
433            future.completeExceptionally(e);
434          }
435        }
436      }
437    });
438    return future;
439  }
440
441  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
442      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
443      ProcedureBiConsumer consumer) {
444    return procedureCall(b -> {
445    }, preq, rpcCall, respConverter, consumer);
446  }
447
448  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
449      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
450      ProcedureBiConsumer consumer) {
451    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
452  }
453
454  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
455      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
456      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
457      ProcedureBiConsumer consumer) {
458    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
459        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
460    prioritySetter.accept(builder);
461    CompletableFuture<Long> procFuture = builder.call();
462    CompletableFuture<Void> future = waitProcedureResult(procFuture);
463    addListener(future, consumer);
464    return future;
465  }
466
467  @FunctionalInterface
468  private interface TableOperator {
469    CompletableFuture<Void> operate(TableName table);
470  }
471
472  @Override
473  public CompletableFuture<Boolean> tableExists(TableName tableName) {
474    if (TableName.isMetaTableName(tableName)) {
475      return CompletableFuture.completedFuture(true);
476    }
477    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
478  }
479
480  @Override
481  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
482    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
483      includeSysTables));
484  }
485
486  /**
487   * {@link #listTableDescriptors(boolean)}
488   */
489  @Override
490  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
491      boolean includeSysTables) {
492    Preconditions.checkNotNull(pattern,
493      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
494    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
495      includeSysTables));
496  }
497
498  @Override
499  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
500    Preconditions.checkNotNull(tableNames,
501      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
502    if (tableNames.isEmpty()) {
503      return CompletableFuture.completedFuture(Collections.emptyList());
504    }
505    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
506  }
507
508  private CompletableFuture<List<TableDescriptor>>
509      getTableDescriptors(GetTableDescriptorsRequest request) {
510    return this.<List<TableDescriptor>> newMasterCaller()
511        .action((controller, stub) -> this
512            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
513              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
514              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
515        .call();
516  }
517
518  @Override
519  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
520    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
521  }
522
523  @Override
524  public CompletableFuture<List<TableName>>
525      listTableNames(Pattern pattern, boolean includeSysTables) {
526    Preconditions.checkNotNull(pattern,
527        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
528    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
529  }
530
531  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
532    return this
533        .<List<TableName>> newMasterCaller()
534        .action(
535          (controller, stub) -> this
536              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
537                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
538                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
539  }
540
541  @Override
542  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
543    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
544        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
545        List<TableDescriptor>> call(
546          controller, stub,
547          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
548          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
549          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
550        .call();
551  }
552
553  @Override
554  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
555    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
556        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
557        List<TableName>> call(
558          controller, stub,
559          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
560          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
561          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
562        .call();
563  }
564
565  @Override
566  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
567    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
568    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
569      .action((controller, stub) -> this
570        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
571          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
572          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
573          (resp) -> resp.getTableSchemaList()))
574      .call(), (tableSchemas, error) -> {
575        if (error != null) {
576          future.completeExceptionally(error);
577          return;
578        }
579        if (!tableSchemas.isEmpty()) {
580          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
581        } else {
582          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
583        }
584      });
585    return future;
586  }
587
588  @Override
589  public CompletableFuture<Void> createTable(TableDescriptor desc) {
590    return createTable(desc.getTableName(),
591      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
592  }
593
594  @Override
595  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
596      int numRegions) {
597    try {
598      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
599    } catch (IllegalArgumentException e) {
600      return failedFuture(e);
601    }
602  }
603
604  @Override
605  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
606    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
607        + " use createTable(TableDescriptor) instead");
608    try {
609      verifySplitKeys(splitKeys);
610      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
611        splitKeys, ng.getNonceGroup(), ng.newNonce()));
612    } catch (IllegalArgumentException e) {
613      return failedFuture(e);
614    }
615  }
616
617  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
618    Preconditions.checkNotNull(tableName, "table name is null");
619    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
620      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
621      new CreateTableProcedureBiConsumer(tableName));
622  }
623
624  @Override
625  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
626    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
627      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
628        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
629      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
630  }
631
632  @Override
633  public CompletableFuture<Void> deleteTable(TableName tableName) {
634    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
635      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
636      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
637      new DeleteTableProcedureBiConsumer(tableName));
638  }
639
640  @Override
641  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
642    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
643      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
644        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
645      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
646  }
647
648  @Override
649  public CompletableFuture<Void> enableTable(TableName tableName) {
650    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
651      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
652      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
653      new EnableTableProcedureBiConsumer(tableName));
654  }
655
656  @Override
657  public CompletableFuture<Void> disableTable(TableName tableName) {
658    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
659      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
660      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
661      new DisableTableProcedureBiConsumer(tableName));
662  }
663
664  /**
665   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
666   * using passed parameters. Sets error or boolean result ('true' if table matches
667   * the passed-in targetState).
668   */
669  private static CompletableFuture<Boolean> completeCheckTableState(
670      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
671      TableState.State targetState, TableName tableName) {
672    if (error != null) {
673      future.completeExceptionally(error);
674    } else {
675      if (tableState != null) {
676        future.complete(tableState.inStates(targetState));
677      } else {
678        future.completeExceptionally(new TableNotFoundException(tableName));
679      }
680    }
681    return future;
682  }
683
684  @Override
685  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
686    if (TableName.isMetaTableName(tableName)) {
687      return CompletableFuture.completedFuture(true);
688    }
689    CompletableFuture<Boolean> future = new CompletableFuture<>();
690    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
691      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
692        TableState.State.ENABLED, tableName);
693    });
694    return future;
695  }
696
697  @Override
698  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
699    if (TableName.isMetaTableName(tableName)) {
700      return CompletableFuture.completedFuture(false);
701    }
702    CompletableFuture<Boolean> future = new CompletableFuture<>();
703    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
704      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
705        TableState.State.DISABLED, tableName);
706    });
707    return future;
708  }
709
710  @Override
711  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
712    return isTableAvailable(tableName, Optional.empty());
713  }
714
715  @Override
716  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
717    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
718        + " use isTableAvailable(TableName) instead");
719    return isTableAvailable(tableName, Optional.of(splitKeys));
720  }
721
722  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
723      Optional<byte[][]> splitKeys) {
724    if (TableName.isMetaTableName(tableName)) {
725      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
726        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
727    }
728    CompletableFuture<Boolean> future = new CompletableFuture<>();
729    addListener(isTableEnabled(tableName), (enabled, error) -> {
730      if (error != null) {
731        if (error instanceof TableNotFoundException) {
732          future.complete(false);
733        } else {
734          future.completeExceptionally(error);
735        }
736        return;
737      }
738      if (!enabled) {
739        future.complete(false);
740      } else {
741        addListener(
742          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
743          (locations, error1) -> {
744            if (error1 != null) {
745              future.completeExceptionally(error1);
746              return;
747            }
748            List<HRegionLocation> notDeployedRegions = locations.stream()
749              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
750            if (notDeployedRegions.size() > 0) {
751              if (LOG.isDebugEnabled()) {
752                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
753              }
754              future.complete(false);
755              return;
756            }
757
758            Optional<Boolean> available =
759              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
760            future.complete(available.orElse(true));
761          });
762      }
763    });
764    return future;
765  }
766
767  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
768    int regionCount = 0;
769    for (HRegionLocation location : locations) {
770      RegionInfo info = location.getRegion();
771      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
772        regionCount++;
773        continue;
774      }
775      for (byte[] splitKey : splitKeys) {
776        // Just check if the splitkey is available
777        if (Bytes.equals(info.getStartKey(), splitKey)) {
778          regionCount++;
779          break;
780        }
781      }
782    }
783    return regionCount == splitKeys.length + 1;
784  }
785
786  @Override
787  public CompletableFuture<Void> addColumnFamily(
788      TableName tableName, ColumnFamilyDescriptor columnFamily) {
789    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
790      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
791        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
792      new AddColumnFamilyProcedureBiConsumer(tableName));
793  }
794
795  @Override
796  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
797    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
798      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
799        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
800      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
801  }
802
803  @Override
804  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
805      ColumnFamilyDescriptor columnFamily) {
806    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
807      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
808        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
809      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
810  }
811
812  @Override
813  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
814    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
815      RequestConverter.buildCreateNamespaceRequest(descriptor),
816      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
817      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
818  }
819
820  @Override
821  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
822    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
823      RequestConverter.buildModifyNamespaceRequest(descriptor),
824      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
825      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
826  }
827
828  @Override
829  public CompletableFuture<Void> deleteNamespace(String name) {
830    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
831      RequestConverter.buildDeleteNamespaceRequest(name),
832      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
833      new DeleteNamespaceProcedureBiConsumer(name));
834  }
835
836  @Override
837  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
838    return this
839        .<NamespaceDescriptor> newMasterCaller()
840        .action(
841          (controller, stub) -> this
842              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
843                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
844                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
845                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
846  }
847
848  @Override
849  public CompletableFuture<List<String>> listNamespaces() {
850    return this
851        .<List<String>> newMasterCaller()
852        .action(
853          (controller, stub) -> this
854              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
855                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
856                  done) -> s.listNamespaces(c, req, done),
857                (resp) -> resp.getNamespaceNameList())).call();
858  }
859
860  @Override
861  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
862    return this
863        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
864              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
865                  List<NamespaceDescriptor>> call(controller, stub,
866                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
867                      s.listNamespaceDescriptors(c, req, done),
868                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
869  }
870
871  @Override
872  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
873    return this.<List<RegionInfo>> newAdminCaller()
874        .action((controller, stub) -> this
875            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
876              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
877              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
878              resp -> ProtobufUtil.getRegionInfos(resp)))
879        .serverName(serverName).call();
880  }
881
882  @Override
883  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
884    if (tableName.equals(META_TABLE_NAME)) {
885      return connection.registry.getMetaRegionLocations()
886        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
887          .collect(Collectors.toList()));
888    } else {
889      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
890        .thenApply(
891          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
892    }
893  }
894
895  @Override
896  public CompletableFuture<Void> flush(TableName tableName) {
897    CompletableFuture<Void> future = new CompletableFuture<>();
898    addListener(tableExists(tableName), (exists, err) -> {
899      if (err != null) {
900        future.completeExceptionally(err);
901      } else if (!exists) {
902        future.completeExceptionally(new TableNotFoundException(tableName));
903      } else {
904        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
905          if (err2 != null) {
906            future.completeExceptionally(err2);
907          } else if (!tableEnabled) {
908            future.completeExceptionally(new TableNotEnabledException(tableName));
909          } else {
910            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
911              new HashMap<>()), (ret, err3) -> {
912                if (err3 != null) {
913                  future.completeExceptionally(err3);
914                } else {
915                  future.complete(ret);
916                }
917              });
918          }
919        });
920      }
921    });
922    return future;
923  }
924
925  @Override
926  public CompletableFuture<Void> flushRegion(byte[] regionName) {
927    CompletableFuture<Void> future = new CompletableFuture<>();
928    addListener(getRegionLocation(regionName), (location, err) -> {
929      if (err != null) {
930        future.completeExceptionally(err);
931        return;
932      }
933      ServerName serverName = location.getServerName();
934      if (serverName == null) {
935        future
936          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
937        return;
938      }
939      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
940        if (err2 != null) {
941          future.completeExceptionally(err2);
942        } else {
943          future.complete(ret);
944        }
945      });
946    });
947    return future;
948  }
949
950  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
951    return this.<Void> newAdminCaller()
952            .serverName(serverName)
953            .action(
954              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
955                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
956                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
957                resp -> null))
958            .call();
959  }
960
961  @Override
962  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
963    CompletableFuture<Void> future = new CompletableFuture<>();
964    addListener(getRegions(sn), (hRegionInfos, err) -> {
965      if (err != null) {
966        future.completeExceptionally(err);
967        return;
968      }
969      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
970      if (hRegionInfos != null) {
971        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
972      }
973      addListener(CompletableFuture.allOf(
974        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
975          if (err2 != null) {
976            future.completeExceptionally(err2);
977          } else {
978            future.complete(ret);
979          }
980        });
981    });
982    return future;
983  }
984
985  @Override
986  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
987    return compact(tableName, null, false, compactType);
988  }
989
990  @Override
991  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
992      CompactType compactType) {
993    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
994        + "If you don't specify a columnFamily, use compact(TableName) instead");
995    return compact(tableName, columnFamily, false, compactType);
996  }
997
998  @Override
999  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1000    return compactRegion(regionName, null, false);
1001  }
1002
1003  @Override
1004  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1005    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1006        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1007    return compactRegion(regionName, columnFamily, false);
1008  }
1009
1010  @Override
1011  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1012    return compact(tableName, null, true, compactType);
1013  }
1014
1015  @Override
1016  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1017      CompactType compactType) {
1018    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1019        + "If you don't specify a columnFamily, use compact(TableName) instead");
1020    return compact(tableName, columnFamily, true, compactType);
1021  }
1022
1023  @Override
1024  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1025    return compactRegion(regionName, null, true);
1026  }
1027
1028  @Override
1029  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1030    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1031        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1032    return compactRegion(regionName, columnFamily, true);
1033  }
1034
1035  @Override
1036  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1037    return compactRegionServer(sn, false);
1038  }
1039
1040  @Override
1041  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1042    return compactRegionServer(sn, true);
1043  }
1044
1045  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1046    CompletableFuture<Void> future = new CompletableFuture<>();
1047    addListener(getRegions(sn), (hRegionInfos, err) -> {
1048      if (err != null) {
1049        future.completeExceptionally(err);
1050        return;
1051      }
1052      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1053      if (hRegionInfos != null) {
1054        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1055      }
1056      addListener(CompletableFuture.allOf(
1057        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1058          if (err2 != null) {
1059            future.completeExceptionally(err2);
1060          } else {
1061            future.complete(ret);
1062          }
1063        });
1064    });
1065    return future;
1066  }
1067
1068  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1069      boolean major) {
1070    CompletableFuture<Void> future = new CompletableFuture<>();
1071    addListener(getRegionLocation(regionName), (location, err) -> {
1072      if (err != null) {
1073        future.completeExceptionally(err);
1074        return;
1075      }
1076      ServerName serverName = location.getServerName();
1077      if (serverName == null) {
1078        future
1079          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1080        return;
1081      }
1082      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1083        (ret, err2) -> {
1084          if (err2 != null) {
1085            future.completeExceptionally(err2);
1086          } else {
1087            future.complete(ret);
1088          }
1089        });
1090    });
1091    return future;
1092  }
1093
1094  /**
1095   * List all region locations for the specific table.
1096   */
1097  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1098    if (TableName.META_TABLE_NAME.equals(tableName)) {
1099      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1100      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1101        if (err != null) {
1102          future.completeExceptionally(err);
1103        } else if (metaRegions == null || metaRegions.isEmpty() ||
1104          metaRegions.getDefaultRegionLocation() == null) {
1105          future.completeExceptionally(new IOException("meta region does not found"));
1106        } else {
1107          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1108        }
1109      });
1110      return future;
1111    } else {
1112      // For non-meta table, we fetch all locations by scanning hbase:meta table
1113      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1114    }
1115  }
1116
1117  /**
1118   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1119   */
1120  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1121      CompactType compactType) {
1122    CompletableFuture<Void> future = new CompletableFuture<>();
1123
1124    switch (compactType) {
1125      case MOB:
1126        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1127          if (err != null) {
1128            future.completeExceptionally(err);
1129            return;
1130          }
1131          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1132          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1133            if (err2 != null) {
1134              future.completeExceptionally(err2);
1135            } else {
1136              future.complete(ret);
1137            }
1138          });
1139        });
1140        break;
1141      case NORMAL:
1142        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1143          if (err != null) {
1144            future.completeExceptionally(err);
1145            return;
1146          }
1147          if (locations == null || locations.isEmpty()) {
1148            future.completeExceptionally(new TableNotFoundException(tableName));
1149          }
1150          CompletableFuture<?>[] compactFutures =
1151            locations.stream().filter(l -> l.getRegion() != null)
1152              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1153              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1154              .toArray(CompletableFuture<?>[]::new);
1155          // future complete unless all of the compact futures are completed.
1156          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1157            if (err2 != null) {
1158              future.completeExceptionally(err2);
1159            } else {
1160              future.complete(ret);
1161            }
1162          });
1163        });
1164        break;
1165      default:
1166        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1167    }
1168    return future;
1169  }
1170
1171  /**
1172   * Compact the region at specific region server.
1173   */
1174  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1175      final boolean major, byte[] columnFamily) {
1176    return this
1177        .<Void> newAdminCaller()
1178        .serverName(sn)
1179        .action(
1180          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1181            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1182              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1183            resp -> null)).call();
1184  }
1185
1186  private byte[] toEncodeRegionName(byte[] regionName) {
1187    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1188      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1189  }
1190
1191  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1192      CompletableFuture<TableName> result) {
1193    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1194      if (err != null) {
1195        result.completeExceptionally(err);
1196        return;
1197      }
1198      RegionInfo regionInfo = location.getRegion();
1199      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1200        result.completeExceptionally(
1201          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1202        return;
1203      }
1204      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1205        if (!tableName.get().equals(regionInfo.getTable())) {
1206          // tables of this two region should be same.
1207          result.completeExceptionally(
1208            new IllegalArgumentException("Cannot merge regions from two different tables " +
1209              tableName.get() + " and " + regionInfo.getTable()));
1210        } else {
1211          result.complete(tableName.get());
1212        }
1213      }
1214    });
1215  }
1216
1217  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1218    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1219    CompletableFuture<TableName> future = new CompletableFuture<>();
1220    for (byte[] encodedRegionName : encodedRegionNames) {
1221      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1222    }
1223    return future;
1224  }
1225
1226  @Override
1227  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1228    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1229  }
1230
1231  @Override
1232  public CompletableFuture<Boolean> isMergeEnabled() {
1233    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1234  }
1235
1236  @Override
1237  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1238    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1239  }
1240
1241  @Override
1242  public CompletableFuture<Boolean> isSplitEnabled() {
1243    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1244  }
1245
1246  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1247      MasterSwitchType switchType) {
1248    SetSplitOrMergeEnabledRequest request =
1249      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1250    return this.<Boolean> newMasterCaller()
1251      .action((controller, stub) -> this
1252        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1253          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1254          (resp) -> resp.getPrevValueList().get(0)))
1255      .call();
1256  }
1257
1258  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1259    IsSplitOrMergeEnabledRequest request =
1260        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1261    return this
1262        .<Boolean> newMasterCaller()
1263        .action(
1264          (controller, stub) -> this
1265              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1266                controller, stub, request,
1267                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1268                (resp) -> resp.getEnabled())).call();
1269  }
1270
1271  @Override
1272  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1273    if (nameOfRegionsToMerge.size() < 2) {
1274      return failedFuture(new IllegalArgumentException(
1275        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1276    }
1277    CompletableFuture<Void> future = new CompletableFuture<>();
1278    byte[][] encodedNameOfRegionsToMerge =
1279      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1280
1281    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1282      if (err != null) {
1283        future.completeExceptionally(err);
1284        return;
1285      }
1286
1287      final MergeTableRegionsRequest request;
1288      try {
1289        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1290          forcible, ng.getNonceGroup(), ng.newNonce());
1291      } catch (DeserializationException e) {
1292        future.completeExceptionally(e);
1293        return;
1294      }
1295
1296      addListener(
1297        this.procedureCall(tableName, request,
1298          MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1299          new MergeTableRegionProcedureBiConsumer(tableName)),
1300        (ret, err2) -> {
1301          if (err2 != null) {
1302            future.completeExceptionally(err2);
1303          } else {
1304            future.complete(ret);
1305          }
1306        });
1307    });
1308    return future;
1309  }
1310
1311  @Override
1312  public CompletableFuture<Void> split(TableName tableName) {
1313    CompletableFuture<Void> future = new CompletableFuture<>();
1314    addListener(tableExists(tableName), (exist, error) -> {
1315      if (error != null) {
1316        future.completeExceptionally(error);
1317        return;
1318      }
1319      if (!exist) {
1320        future.completeExceptionally(new TableNotFoundException(tableName));
1321        return;
1322      }
1323      addListener(
1324        metaTable
1325          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1326            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1327            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1328        (results, err2) -> {
1329          if (err2 != null) {
1330            future.completeExceptionally(err2);
1331            return;
1332          }
1333          if (results != null && !results.isEmpty()) {
1334            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1335            for (Result r : results) {
1336              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1337                continue;
1338              }
1339              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1340              if (rl != null) {
1341                for (HRegionLocation h : rl.getRegionLocations()) {
1342                  if (h != null && h.getServerName() != null) {
1343                    RegionInfo hri = h.getRegion();
1344                    if (hri == null || hri.isSplitParent() ||
1345                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1346                      continue;
1347                    }
1348                    splitFutures.add(split(hri, null));
1349                  }
1350                }
1351              }
1352            }
1353            addListener(
1354              CompletableFuture
1355                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1356              (ret, exception) -> {
1357                if (exception != null) {
1358                  future.completeExceptionally(exception);
1359                  return;
1360                }
1361                future.complete(ret);
1362              });
1363          } else {
1364            future.complete(null);
1365          }
1366        });
1367    });
1368    return future;
1369  }
1370
1371  @Override
1372  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1373    CompletableFuture<Void> result = new CompletableFuture<>();
1374    if (splitPoint == null) {
1375      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1376    }
1377    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1378      (loc, err) -> {
1379        if (err != null) {
1380          result.completeExceptionally(err);
1381        } else if (loc == null || loc.getRegion() == null) {
1382          result.completeExceptionally(new IllegalArgumentException(
1383            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1384        } else {
1385          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1386            if (err2 != null) {
1387              result.completeExceptionally(err2);
1388            } else {
1389              result.complete(ret);
1390            }
1391
1392          });
1393        }
1394      });
1395    return result;
1396  }
1397
1398  @Override
1399  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1400    CompletableFuture<Void> future = new CompletableFuture<>();
1401    addListener(getRegionLocation(regionName), (location, err) -> {
1402      if (err != null) {
1403        future.completeExceptionally(err);
1404        return;
1405      }
1406      RegionInfo regionInfo = location.getRegion();
1407      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1408        future
1409          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1410            "Replicas are auto-split when their primary is split."));
1411        return;
1412      }
1413      ServerName serverName = location.getServerName();
1414      if (serverName == null) {
1415        future
1416          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1417        return;
1418      }
1419      addListener(split(regionInfo, null), (ret, err2) -> {
1420        if (err2 != null) {
1421          future.completeExceptionally(err2);
1422        } else {
1423          future.complete(ret);
1424        }
1425      });
1426    });
1427    return future;
1428  }
1429
1430  @Override
1431  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1432    Preconditions.checkNotNull(splitPoint,
1433      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1434    CompletableFuture<Void> future = new CompletableFuture<>();
1435    addListener(getRegionLocation(regionName), (location, err) -> {
1436      if (err != null) {
1437        future.completeExceptionally(err);
1438        return;
1439      }
1440      RegionInfo regionInfo = location.getRegion();
1441      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1442        future
1443          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1444            "Replicas are auto-split when their primary is split."));
1445        return;
1446      }
1447      ServerName serverName = location.getServerName();
1448      if (serverName == null) {
1449        future
1450          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1451        return;
1452      }
1453      if (regionInfo.getStartKey() != null &&
1454        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1455        future.completeExceptionally(
1456          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1457        return;
1458      }
1459      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1460        if (err2 != null) {
1461          future.completeExceptionally(err2);
1462        } else {
1463          future.complete(ret);
1464        }
1465      });
1466    });
1467    return future;
1468  }
1469
1470  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1471    CompletableFuture<Void> future = new CompletableFuture<>();
1472    TableName tableName = hri.getTable();
1473    final SplitTableRegionRequest request;
1474    try {
1475      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1476        ng.newNonce());
1477    } catch (DeserializationException e) {
1478      future.completeExceptionally(e);
1479      return future;
1480    }
1481
1482    addListener(
1483      this.procedureCall(tableName,
1484        request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1485        new SplitTableRegionProcedureBiConsumer(tableName)),
1486      (ret, err2) -> {
1487        if (err2 != null) {
1488          future.completeExceptionally(err2);
1489        } else {
1490          future.complete(ret);
1491        }
1492      });
1493    return future;
1494  }
1495
1496  @Override
1497  public CompletableFuture<Void> assign(byte[] regionName) {
1498    CompletableFuture<Void> future = new CompletableFuture<>();
1499    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1500      if (err != null) {
1501        future.completeExceptionally(err);
1502        return;
1503      }
1504      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1505        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1506          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1507          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1508        .call(), (ret, err2) -> {
1509          if (err2 != null) {
1510            future.completeExceptionally(err2);
1511          } else {
1512            future.complete(ret);
1513          }
1514        });
1515    });
1516    return future;
1517  }
1518
1519  @Override
1520  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1521    CompletableFuture<Void> future = new CompletableFuture<>();
1522    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1523      if (err != null) {
1524        future.completeExceptionally(err);
1525        return;
1526      }
1527      addListener(
1528        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1529          .action(((controller, stub) -> this
1530            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1531              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1532              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1533          .call(),
1534        (ret, err2) -> {
1535          if (err2 != null) {
1536            future.completeExceptionally(err2);
1537          } else {
1538            future.complete(ret);
1539          }
1540        });
1541    });
1542    return future;
1543  }
1544
1545  @Override
1546  public CompletableFuture<Void> offline(byte[] regionName) {
1547    CompletableFuture<Void> future = new CompletableFuture<>();
1548    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1549      if (err != null) {
1550        future.completeExceptionally(err);
1551        return;
1552      }
1553      addListener(
1554        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1555          .action(((controller, stub) -> this
1556            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1557              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1558              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1559          .call(),
1560        (ret, err2) -> {
1561          if (err2 != null) {
1562            future.completeExceptionally(err2);
1563          } else {
1564            future.complete(ret);
1565          }
1566        });
1567    });
1568    return future;
1569  }
1570
1571  @Override
1572  public CompletableFuture<Void> move(byte[] regionName) {
1573    CompletableFuture<Void> future = new CompletableFuture<>();
1574    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1575      if (err != null) {
1576        future.completeExceptionally(err);
1577        return;
1578      }
1579      addListener(
1580        moveRegion(regionInfo,
1581          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1582        (ret, err2) -> {
1583          if (err2 != null) {
1584            future.completeExceptionally(err2);
1585          } else {
1586            future.complete(ret);
1587          }
1588        });
1589    });
1590    return future;
1591  }
1592
1593  @Override
1594  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1595    Preconditions.checkNotNull(destServerName,
1596      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1597    CompletableFuture<Void> future = new CompletableFuture<>();
1598    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1599      if (err != null) {
1600        future.completeExceptionally(err);
1601        return;
1602      }
1603      addListener(
1604        moveRegion(regionInfo, RequestConverter
1605          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1606        (ret, err2) -> {
1607          if (err2 != null) {
1608            future.completeExceptionally(err2);
1609          } else {
1610            future.complete(ret);
1611          }
1612        });
1613    });
1614    return future;
1615  }
1616
1617  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1618    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1619      .action(
1620        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1621          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1622      .call();
1623  }
1624
1625  @Override
1626  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1627    return this
1628        .<Void> newMasterCaller()
1629        .action(
1630          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1631            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1632            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1633  }
1634
1635  @Override
1636  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1637    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1638    Scan scan = QuotaTableUtil.makeScan(filter);
1639    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1640        .scan(scan, new AdvancedScanResultConsumer() {
1641          List<QuotaSettings> settings = new ArrayList<>();
1642
1643          @Override
1644          public void onNext(Result[] results, ScanController controller) {
1645            for (Result result : results) {
1646              try {
1647                QuotaTableUtil.parseResultToCollection(result, settings);
1648              } catch (IOException e) {
1649                controller.terminate();
1650                future.completeExceptionally(e);
1651              }
1652            }
1653          }
1654
1655          @Override
1656          public void onError(Throwable error) {
1657            future.completeExceptionally(error);
1658          }
1659
1660          @Override
1661          public void onComplete() {
1662            future.complete(settings);
1663          }
1664        });
1665    return future;
1666  }
1667
1668  @Override
1669  public CompletableFuture<Void> addReplicationPeer(String peerId,
1670      ReplicationPeerConfig peerConfig, boolean enabled) {
1671    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1672      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1673      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1674      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1675  }
1676
1677  @Override
1678  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1679    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1680      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1681      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1682      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1683  }
1684
1685  @Override
1686  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1687    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1688      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1689      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1690      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1691  }
1692
1693  @Override
1694  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1695    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1696      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1697      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1698      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1699  }
1700
1701  @Override
1702  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1703    return this
1704        .<ReplicationPeerConfig> newMasterCaller()
1705        .action(
1706          (controller, stub) -> this
1707              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1708                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
1709                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1710                (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1711  }
1712
1713  @Override
1714  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1715      ReplicationPeerConfig peerConfig) {
1716    return this
1717        .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1718          RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1719          (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1720          (resp) -> resp.getProcId(),
1721          new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1722  }
1723
1724  @Override
1725  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1726      Map<TableName, List<String>> tableCfs) {
1727    if (tableCfs == null) {
1728      return failedFuture(new ReplicationException("tableCfs is null"));
1729    }
1730
1731    CompletableFuture<Void> future = new CompletableFuture<Void>();
1732    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1733      if (!completeExceptionally(future, error)) {
1734        ReplicationPeerConfig newPeerConfig =
1735          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1736        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1737          if (!completeExceptionally(future, error)) {
1738            future.complete(result);
1739          }
1740        });
1741      }
1742    });
1743    return future;
1744  }
1745
1746  @Override
1747  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1748      Map<TableName, List<String>> tableCfs) {
1749    if (tableCfs == null) {
1750      return failedFuture(new ReplicationException("tableCfs is null"));
1751    }
1752
1753    CompletableFuture<Void> future = new CompletableFuture<Void>();
1754    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1755      if (!completeExceptionally(future, error)) {
1756        ReplicationPeerConfig newPeerConfig = null;
1757        try {
1758          newPeerConfig = ReplicationPeerConfigUtil
1759            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1760        } catch (ReplicationException e) {
1761          future.completeExceptionally(e);
1762          return;
1763        }
1764        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1765          if (!completeExceptionally(future, error)) {
1766            future.complete(result);
1767          }
1768        });
1769      }
1770    });
1771    return future;
1772  }
1773
1774  @Override
1775  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1776    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1777  }
1778
1779  @Override
1780  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1781    Preconditions.checkNotNull(pattern,
1782      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1783    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1784  }
1785
1786  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1787      ListReplicationPeersRequest request) {
1788    return this
1789        .<List<ReplicationPeerDescription>> newMasterCaller()
1790        .action(
1791          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1792              List<ReplicationPeerDescription>> call(controller, stub, request,
1793                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1794                (resp) -> resp.getPeerDescList().stream()
1795                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1796                    .collect(Collectors.toList()))).call();
1797  }
1798
1799  @Override
1800  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1801    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1802    addListener(listTableDescriptors(), (tables, error) -> {
1803      if (!completeExceptionally(future, error)) {
1804        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1805        tables.forEach(table -> {
1806          Map<String, Integer> cfs = new HashMap<>();
1807          Stream.of(table.getColumnFamilies())
1808            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1809            .forEach(column -> {
1810              cfs.put(column.getNameAsString(), column.getScope());
1811            });
1812          if (!cfs.isEmpty()) {
1813            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1814          }
1815        });
1816        future.complete(replicatedTableCFs);
1817      }
1818    });
1819    return future;
1820  }
1821
1822  @Override
1823  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1824    SnapshotProtos.SnapshotDescription snapshot =
1825      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1826    try {
1827      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1828    } catch (IllegalArgumentException e) {
1829      return failedFuture(e);
1830    }
1831    CompletableFuture<Void> future = new CompletableFuture<>();
1832    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1833    addListener(this.<Long> newMasterCaller()
1834      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1835        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1836        resp -> resp.getExpectedTimeout()))
1837      .call(), (expectedTimeout, err) -> {
1838        if (err != null) {
1839          future.completeExceptionally(err);
1840          return;
1841        }
1842        TimerTask pollingTask = new TimerTask() {
1843          int tries = 0;
1844          long startTime = EnvironmentEdgeManager.currentTime();
1845          long endTime = startTime + expectedTimeout;
1846          long maxPauseTime = expectedTimeout / maxAttempts;
1847
1848          @Override
1849          public void run(Timeout timeout) throws Exception {
1850            if (EnvironmentEdgeManager.currentTime() < endTime) {
1851              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1852                if (err2 != null) {
1853                  future.completeExceptionally(err2);
1854                } else if (done) {
1855                  future.complete(null);
1856                } else {
1857                  // retry again after pauseTime.
1858                  long pauseTime =
1859                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1860                  pauseTime = Math.min(pauseTime, maxPauseTime);
1861                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1862                    TimeUnit.MILLISECONDS);
1863                }
1864              });
1865            } else {
1866              future.completeExceptionally(
1867                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1868                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1869            }
1870          }
1871        };
1872        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1873      });
1874    return future;
1875  }
1876
1877  @Override
1878  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1879    return this
1880        .<Boolean> newMasterCaller()
1881        .action(
1882          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1883            controller,
1884            stub,
1885            IsSnapshotDoneRequest.newBuilder()
1886                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1887                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1888  }
1889
1890  @Override
1891  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1892    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1893      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1894      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1895    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1896  }
1897
1898  @Override
1899  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1900      boolean restoreAcl) {
1901    CompletableFuture<Void> future = new CompletableFuture<>();
1902    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1903      if (err != null) {
1904        future.completeExceptionally(err);
1905        return;
1906      }
1907      TableName tableName = null;
1908      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1909        for (SnapshotDescription snap : snapshotDescriptions) {
1910          if (snap.getName().equals(snapshotName)) {
1911            tableName = snap.getTableName();
1912            break;
1913          }
1914        }
1915      }
1916      if (tableName == null) {
1917        future.completeExceptionally(new RestoreSnapshotException(
1918          "Unable to find the table name for snapshot=" + snapshotName));
1919        return;
1920      }
1921      final TableName finalTableName = tableName;
1922      addListener(tableExists(finalTableName), (exists, err2) -> {
1923        if (err2 != null) {
1924          future.completeExceptionally(err2);
1925        } else if (!exists) {
1926          // if table does not exist, then just clone snapshot into new table.
1927          completeConditionalOnFuture(future,
1928            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1929        } else {
1930          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1931            if (err4 != null) {
1932              future.completeExceptionally(err4);
1933            } else if (!disabled) {
1934              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1935            } else {
1936              completeConditionalOnFuture(future,
1937                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1938            }
1939          });
1940        }
1941      });
1942    });
1943    return future;
1944  }
1945
1946  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1947      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1948    if (takeFailSafeSnapshot) {
1949      CompletableFuture<Void> future = new CompletableFuture<>();
1950      // Step.1 Take a snapshot of the current state
1951      String failSafeSnapshotSnapshotNameFormat =
1952        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1953          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1954      final String failSafeSnapshotSnapshotName =
1955        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1956          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1957          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1958      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1959      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1960        if (err != null) {
1961          future.completeExceptionally(err);
1962        } else {
1963          // Step.2 Restore snapshot
1964          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1965            (void2, err2) -> {
1966              if (err2 != null) {
1967                // Step.3.a Something went wrong during the restore and try to rollback.
1968                addListener(
1969                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1970                  (void3, err3) -> {
1971                    if (err3 != null) {
1972                      future.completeExceptionally(err3);
1973                    } else {
1974                      String msg =
1975                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1976                          failSafeSnapshotSnapshotName + " succeeded.";
1977                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1978                    }
1979                  });
1980              } else {
1981                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1982                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1983                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1984                  if (err3 != null) {
1985                    LOG.error(
1986                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1987                      err3);
1988                    future.completeExceptionally(err3);
1989                  } else {
1990                    future.complete(ret3);
1991                  }
1992                });
1993              }
1994            });
1995        }
1996      });
1997      return future;
1998    } else {
1999      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
2000    }
2001  }
2002
2003  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2004      CompletableFuture<T> parentFuture) {
2005    addListener(parentFuture, (res, err) -> {
2006      if (err != null) {
2007        dependentFuture.completeExceptionally(err);
2008      } else {
2009        dependentFuture.complete(res);
2010      }
2011    });
2012  }
2013
2014  @Override
2015  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2016      boolean restoreAcl) {
2017    CompletableFuture<Void> future = new CompletableFuture<>();
2018    addListener(tableExists(tableName), (exists, err) -> {
2019      if (err != null) {
2020        future.completeExceptionally(err);
2021      } else if (exists) {
2022        future.completeExceptionally(new TableExistsException(tableName));
2023      } else {
2024        completeConditionalOnFuture(future,
2025          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2026      }
2027    });
2028    return future;
2029  }
2030
2031  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2032      boolean restoreAcl) {
2033    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2034      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2035    try {
2036      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2037    } catch (IllegalArgumentException e) {
2038      return failedFuture(e);
2039    }
2040    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2041      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2042        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2043          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2044        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2045      .call());
2046  }
2047
2048  @Override
2049  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2050    return getCompletedSnapshots(null);
2051  }
2052
2053  @Override
2054  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2055    Preconditions.checkNotNull(pattern,
2056      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2057    return getCompletedSnapshots(pattern);
2058  }
2059
2060  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2061    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2062        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2063        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2064          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2065          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2066        .call();
2067  }
2068
2069  @Override
2070  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2071    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2072        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2073    return getCompletedSnapshots(tableNamePattern, null);
2074  }
2075
2076  @Override
2077  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2078      Pattern snapshotNamePattern) {
2079    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2080        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2081    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2082        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2083    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2084  }
2085
2086  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2087      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2088    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2089    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2090      if (err != null) {
2091        future.completeExceptionally(err);
2092        return;
2093      }
2094      if (tableNames == null || tableNames.size() <= 0) {
2095        future.complete(Collections.emptyList());
2096        return;
2097      }
2098      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2099        if (err2 != null) {
2100          future.completeExceptionally(err2);
2101          return;
2102        }
2103        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2104          future.complete(Collections.emptyList());
2105          return;
2106        }
2107        future.complete(snapshotDescList.stream()
2108          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2109          .collect(Collectors.toList()));
2110      });
2111    });
2112    return future;
2113  }
2114
2115  @Override
2116  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2117    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2118  }
2119
2120  @Override
2121  public CompletableFuture<Void> deleteSnapshots() {
2122    return internalDeleteSnapshots(null, null);
2123  }
2124
2125  @Override
2126  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2127    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2128        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2129    return internalDeleteSnapshots(null, snapshotNamePattern);
2130  }
2131
2132  @Override
2133  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2134    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2135        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2136    return internalDeleteSnapshots(tableNamePattern, null);
2137  }
2138
2139  @Override
2140  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2141      Pattern snapshotNamePattern) {
2142    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2143        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2144    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2145        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2146    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2147  }
2148
2149  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2150      Pattern snapshotNamePattern) {
2151    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2152    if (tableNamePattern == null) {
2153      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2154    } else {
2155      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2156    }
2157    CompletableFuture<Void> future = new CompletableFuture<>();
2158    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2159      if (err != null) {
2160        future.completeExceptionally(err);
2161        return;
2162      }
2163      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2164        future.complete(null);
2165        return;
2166      }
2167      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2168        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2169          if (e != null) {
2170            future.completeExceptionally(e);
2171          } else {
2172            future.complete(v);
2173          }
2174        });
2175    }));
2176    return future;
2177  }
2178
2179  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2180    return this
2181        .<Void> newMasterCaller()
2182        .action(
2183          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2184            controller,
2185            stub,
2186            DeleteSnapshotRequest.newBuilder()
2187                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2188                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2189  }
2190
2191  @Override
2192  public CompletableFuture<Void> execProcedure(String signature, String instance,
2193      Map<String, String> props) {
2194    CompletableFuture<Void> future = new CompletableFuture<>();
2195    ProcedureDescription procDesc =
2196      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2197    addListener(this.<Long> newMasterCaller()
2198      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2199        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2200        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2201      .call(), (expectedTimeout, err) -> {
2202        if (err != null) {
2203          future.completeExceptionally(err);
2204          return;
2205        }
2206        TimerTask pollingTask = new TimerTask() {
2207          int tries = 0;
2208          long startTime = EnvironmentEdgeManager.currentTime();
2209          long endTime = startTime + expectedTimeout;
2210          long maxPauseTime = expectedTimeout / maxAttempts;
2211
2212          @Override
2213          public void run(Timeout timeout) throws Exception {
2214            if (EnvironmentEdgeManager.currentTime() < endTime) {
2215              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2216                if (err2 != null) {
2217                  future.completeExceptionally(err2);
2218                  return;
2219                }
2220                if (done) {
2221                  future.complete(null);
2222                } else {
2223                  // retry again after pauseTime.
2224                  long pauseTime =
2225                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2226                  pauseTime = Math.min(pauseTime, maxPauseTime);
2227                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2228                    TimeUnit.MICROSECONDS);
2229                }
2230              });
2231            } else {
2232              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2233                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2234            }
2235          }
2236        };
2237        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2238        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2239      });
2240    return future;
2241  }
2242
2243  @Override
2244  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2245      Map<String, String> props) {
2246    ProcedureDescription proDesc =
2247        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2248    return this.<byte[]> newMasterCaller()
2249        .action(
2250          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2251            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2252            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2253            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2254        .call();
2255  }
2256
2257  @Override
2258  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2259      Map<String, String> props) {
2260    ProcedureDescription proDesc =
2261        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2262    return this.<Boolean> newMasterCaller()
2263        .action((controller, stub) -> this
2264            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2265              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2266              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2267        .call();
2268  }
2269
2270  @Override
2271  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2272    return this.<Boolean> newMasterCaller().action(
2273      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2274        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2275        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2276        .call();
2277  }
2278
2279  @Override
2280  public CompletableFuture<String> getProcedures() {
2281    return this
2282        .<String> newMasterCaller()
2283        .action(
2284          (controller, stub) -> this
2285              .<GetProceduresRequest, GetProceduresResponse, String> call(
2286                controller, stub, GetProceduresRequest.newBuilder().build(),
2287                (s, c, req, done) -> s.getProcedures(c, req, done),
2288                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2289  }
2290
2291  @Override
2292  public CompletableFuture<String> getLocks() {
2293    return this
2294        .<String> newMasterCaller()
2295        .action(
2296          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2297            controller, stub, GetLocksRequest.newBuilder().build(),
2298            (s, c, req, done) -> s.getLocks(c, req, done),
2299            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2300  }
2301
2302  @Override
2303  public CompletableFuture<Void> decommissionRegionServers(
2304      List<ServerName> servers, boolean offload) {
2305    return this.<Void> newMasterCaller()
2306        .action((controller, stub) -> this
2307          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2308            controller, stub,
2309              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2310            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2311        .call();
2312  }
2313
2314  @Override
2315  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2316    return this.<List<ServerName>> newMasterCaller()
2317        .action((controller, stub) -> this
2318          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2319            List<ServerName>> call(
2320              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2321              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2322              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2323                  .collect(Collectors.toList())))
2324        .call();
2325  }
2326
2327  @Override
2328  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2329      List<byte[]> encodedRegionNames) {
2330    return this.<Void> newMasterCaller()
2331        .action((controller, stub) ->
2332            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2333                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2334                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2335                        c, req, done), resp -> null)).call();
2336  }
2337
2338  /**
2339   * Get the region location for the passed region name. The region name may be a full region name
2340   * or encoded region name. If the region does not found, then it'll throw an
2341   * UnknownRegionException wrapped by a {@link CompletableFuture}
2342   * @param regionNameOrEncodedRegionName region name or encoded region name
2343   * @return region location, wrapped by a {@link CompletableFuture}
2344   */
2345  @VisibleForTesting
2346  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2347    if (regionNameOrEncodedRegionName == null) {
2348      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2349    }
2350    try {
2351      CompletableFuture<Optional<HRegionLocation>> future;
2352      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2353        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2354        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2355          // old format encodedName, should be meta region
2356          future = connection.registry.getMetaRegionLocations()
2357            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2358              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2359        } else {
2360          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2361            regionNameOrEncodedRegionName);
2362        }
2363      } else {
2364        RegionInfo regionInfo =
2365          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2366        if (regionInfo.isMetaRegion()) {
2367          future = connection.registry.getMetaRegionLocations()
2368            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2369              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2370              .findFirst());
2371        } else {
2372          future =
2373            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2374        }
2375      }
2376
2377      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2378      addListener(future, (location, err) -> {
2379        if (err != null) {
2380          returnedFuture.completeExceptionally(err);
2381          return;
2382        }
2383        if (!location.isPresent() || location.get().getRegion() == null) {
2384          returnedFuture.completeExceptionally(
2385            new UnknownRegionException("Invalid region name or encoded region name: " +
2386              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2387        } else {
2388          returnedFuture.complete(location.get());
2389        }
2390      });
2391      return returnedFuture;
2392    } catch (IOException e) {
2393      return failedFuture(e);
2394    }
2395  }
2396
2397  /**
2398   * Get the region info for the passed region name. The region name may be a full region name or
2399   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2400   * wrapped by a {@link CompletableFuture}
2401   * @return region info, wrapped by a {@link CompletableFuture}
2402   */
2403  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2404    if (regionNameOrEncodedRegionName == null) {
2405      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2406    }
2407
2408    if (Bytes.equals(regionNameOrEncodedRegionName,
2409      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2410      Bytes.equals(regionNameOrEncodedRegionName,
2411        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2412      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2413    }
2414
2415    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2416    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2417      if (err != null) {
2418        future.completeExceptionally(err);
2419      } else {
2420        future.complete(location.getRegion());
2421      }
2422    });
2423    return future;
2424  }
2425
2426  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2427    if (numRegions < 3) {
2428      throw new IllegalArgumentException("Must create at least three regions");
2429    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2430      throw new IllegalArgumentException("Start key must be smaller than end key");
2431    }
2432    if (numRegions == 3) {
2433      return new byte[][] { startKey, endKey };
2434    }
2435    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2436    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2437      throw new IllegalArgumentException("Unable to split key range into enough regions");
2438    }
2439    return splitKeys;
2440  }
2441
2442  private void verifySplitKeys(byte[][] splitKeys) {
2443    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2444    // Verify there are no duplicate split keys
2445    byte[] lastKey = null;
2446    for (byte[] splitKey : splitKeys) {
2447      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2448        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2449      }
2450      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2451        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2452            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2453      }
2454      lastKey = splitKey;
2455    }
2456  }
2457
2458  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2459
2460    abstract void onFinished();
2461
2462    abstract void onError(Throwable error);
2463
2464    @Override
2465    public void accept(Void v, Throwable error) {
2466      if (error != null) {
2467        onError(error);
2468        return;
2469      }
2470      onFinished();
2471    }
2472  }
2473
2474  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2475    protected final TableName tableName;
2476
2477    TableProcedureBiConsumer(TableName tableName) {
2478      this.tableName = tableName;
2479    }
2480
2481    abstract String getOperationType();
2482
2483    String getDescription() {
2484      return "Operation: " + getOperationType() + ", " + "Table Name: "
2485          + tableName.getNameWithNamespaceInclAsString();
2486    }
2487
2488    @Override
2489    void onFinished() {
2490      LOG.info(getDescription() + " completed");
2491    }
2492
2493    @Override
2494    void onError(Throwable error) {
2495      LOG.info(getDescription() + " failed with " + error.getMessage());
2496    }
2497  }
2498
2499  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2500    protected final String namespaceName;
2501
2502    NamespaceProcedureBiConsumer(String namespaceName) {
2503      this.namespaceName = namespaceName;
2504    }
2505
2506    abstract String getOperationType();
2507
2508    String getDescription() {
2509      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2510    }
2511
2512    @Override
2513    void onFinished() {
2514      LOG.info(getDescription() + " completed");
2515    }
2516
2517    @Override
2518    void onError(Throwable error) {
2519      LOG.info(getDescription() + " failed with " + error.getMessage());
2520    }
2521  }
2522
2523  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2524
2525    CreateTableProcedureBiConsumer(TableName tableName) {
2526      super(tableName);
2527    }
2528
2529    @Override
2530    String getOperationType() {
2531      return "CREATE";
2532    }
2533  }
2534
2535  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2536
2537    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2538      super(tableName);
2539    }
2540
2541    @Override
2542    String getOperationType() {
2543      return "ENABLE";
2544    }
2545  }
2546
2547  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2548
2549    DeleteTableProcedureBiConsumer(TableName tableName) {
2550      super(tableName);
2551    }
2552
2553    @Override
2554    String getOperationType() {
2555      return "DELETE";
2556    }
2557
2558    @Override
2559    void onFinished() {
2560      connection.getLocator().clearCache(this.tableName);
2561      super.onFinished();
2562    }
2563  }
2564
2565  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2566
2567    TruncateTableProcedureBiConsumer(TableName tableName) {
2568      super(tableName);
2569    }
2570
2571    @Override
2572    String getOperationType() {
2573      return "TRUNCATE";
2574    }
2575  }
2576
2577  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2578
2579    EnableTableProcedureBiConsumer(TableName tableName) {
2580      super(tableName);
2581    }
2582
2583    @Override
2584    String getOperationType() {
2585      return "ENABLE";
2586    }
2587  }
2588
2589  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2590
2591    DisableTableProcedureBiConsumer(TableName tableName) {
2592      super(tableName);
2593    }
2594
2595    @Override
2596    String getOperationType() {
2597      return "DISABLE";
2598    }
2599  }
2600
2601  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2602
2603    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2604      super(tableName);
2605    }
2606
2607    @Override
2608    String getOperationType() {
2609      return "ADD_COLUMN_FAMILY";
2610    }
2611  }
2612
2613  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2614
2615    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2616      super(tableName);
2617    }
2618
2619    @Override
2620    String getOperationType() {
2621      return "DELETE_COLUMN_FAMILY";
2622    }
2623  }
2624
2625  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2626
2627    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2628      super(tableName);
2629    }
2630
2631    @Override
2632    String getOperationType() {
2633      return "MODIFY_COLUMN_FAMILY";
2634    }
2635  }
2636
2637  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2638
2639    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2640      super(namespaceName);
2641    }
2642
2643    @Override
2644    String getOperationType() {
2645      return "CREATE_NAMESPACE";
2646    }
2647  }
2648
2649  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2650
2651    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2652      super(namespaceName);
2653    }
2654
2655    @Override
2656    String getOperationType() {
2657      return "DELETE_NAMESPACE";
2658    }
2659  }
2660
2661  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2662
2663    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2664      super(namespaceName);
2665    }
2666
2667    @Override
2668    String getOperationType() {
2669      return "MODIFY_NAMESPACE";
2670    }
2671  }
2672
2673  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2674
2675    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2676      super(tableName);
2677    }
2678
2679    @Override
2680    String getOperationType() {
2681      return "MERGE_REGIONS";
2682    }
2683  }
2684
2685  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2686
2687    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2688      super(tableName);
2689    }
2690
2691    @Override
2692    String getOperationType() {
2693      return "SPLIT_REGION";
2694    }
2695  }
2696
2697  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2698    private final String peerId;
2699    private final Supplier<String> getOperation;
2700
2701    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2702      this.peerId = peerId;
2703      this.getOperation = getOperation;
2704    }
2705
2706    String getDescription() {
2707      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2708    }
2709
2710    @Override
2711    void onFinished() {
2712      LOG.info(getDescription() + " completed");
2713    }
2714
2715    @Override
2716    void onError(Throwable error) {
2717      LOG.info(getDescription() + " failed with " + error.getMessage());
2718    }
2719  }
2720
2721  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2722    CompletableFuture<Void> future = new CompletableFuture<>();
2723    addListener(procFuture, (procId, error) -> {
2724      if (error != null) {
2725        future.completeExceptionally(error);
2726        return;
2727      }
2728      getProcedureResult(procId, future, 0);
2729    });
2730    return future;
2731  }
2732
2733  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2734    addListener(
2735      this.<GetProcedureResultResponse> newMasterCaller()
2736        .action((controller, stub) -> this
2737          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2738            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2739            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2740        .call(),
2741      (response, error) -> {
2742        if (error != null) {
2743          LOG.warn("failed to get the procedure result procId={}", procId,
2744            ConnectionUtils.translateException(error));
2745          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2746            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2747          return;
2748        }
2749        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2750          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2751            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2752          return;
2753        }
2754        if (response.hasException()) {
2755          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2756          future.completeExceptionally(ioe);
2757        } else {
2758          future.complete(null);
2759        }
2760      });
2761  }
2762
2763  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2764    CompletableFuture<T> future = new CompletableFuture<>();
2765    future.completeExceptionally(error);
2766    return future;
2767  }
2768
2769  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2770    if (error != null) {
2771      future.completeExceptionally(error);
2772      return true;
2773    }
2774    return false;
2775  }
2776
2777  @Override
2778  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2779    return getClusterMetrics(EnumSet.allOf(Option.class));
2780  }
2781
2782  @Override
2783  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2784    return this
2785        .<ClusterMetrics> newMasterCaller()
2786        .action(
2787          (controller, stub) -> this
2788              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2789                stub, RequestConverter.buildGetClusterStatusRequest(options),
2790                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2791                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2792  }
2793
2794  @Override
2795  public CompletableFuture<Void> shutdown() {
2796    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2797      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2798        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2799        resp -> null))
2800      .call();
2801  }
2802
2803  @Override
2804  public CompletableFuture<Void> stopMaster() {
2805    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2806      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2807        controller, stub, StopMasterRequest.newBuilder().build(),
2808        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2809      .call();
2810  }
2811
2812  @Override
2813  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2814    StopServerRequest request = RequestConverter
2815      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2816    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2817      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2818        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2819        resp -> null))
2820      .serverName(serverName).call();
2821  }
2822
2823  @Override
2824  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2825    return this
2826        .<Void> newAdminCaller()
2827        .action(
2828          (controller, stub) -> this
2829              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2830                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2831                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2832        .serverName(serverName).call();
2833  }
2834
2835  @Override
2836  public CompletableFuture<Void> updateConfiguration() {
2837    CompletableFuture<Void> future = new CompletableFuture<Void>();
2838    addListener(
2839      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2840      (status, err) -> {
2841        if (err != null) {
2842          future.completeExceptionally(err);
2843        } else {
2844          List<CompletableFuture<Void>> futures = new ArrayList<>();
2845          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2846          futures.add(updateConfiguration(status.getMasterName()));
2847          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2848          addListener(
2849            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2850            (result, err2) -> {
2851              if (err2 != null) {
2852                future.completeExceptionally(err2);
2853              } else {
2854                future.complete(result);
2855              }
2856            });
2857        }
2858      });
2859    return future;
2860  }
2861
2862  @Override
2863  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2864    return this
2865        .<Void> newAdminCaller()
2866        .action(
2867          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2868            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2869            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2870        .serverName(serverName).call();
2871  }
2872
2873  @Override
2874  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2875    return this
2876        .<Void> newAdminCaller()
2877        .action(
2878          (controller, stub) -> this
2879              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2880                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2881                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2882        .serverName(serverName).call();
2883  }
2884
2885  @Override
2886  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2887    return this
2888        .<List<SecurityCapability>> newMasterCaller()
2889        .action(
2890          (controller, stub) -> this
2891              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2892                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2893                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2894                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2895        .call();
2896  }
2897
2898  @Override
2899  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2900    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2901  }
2902
2903  @Override
2904  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2905      TableName tableName) {
2906    Preconditions.checkNotNull(tableName,
2907      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2908    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2909  }
2910
2911  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2912      ServerName serverName) {
2913    return this.<List<RegionMetrics>> newAdminCaller()
2914        .action((controller, stub) -> this
2915            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2916              adminCall(controller, stub, request, (s, c, req, done) ->
2917                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2918        .serverName(serverName).call();
2919  }
2920
2921  @Override
2922  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2923    return this
2924        .<Boolean> newMasterCaller()
2925        .action(
2926          (controller, stub) -> this
2927              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2928                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2929                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2930                resp -> resp.getInMaintenanceMode())).call();
2931  }
2932
2933  @Override
2934  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2935      CompactType compactType) {
2936    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2937
2938    switch (compactType) {
2939      case MOB:
2940        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
2941          if (err != null) {
2942            future.completeExceptionally(err);
2943            return;
2944          }
2945          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2946
2947          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2948            .action((controller, stub) -> this
2949              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2950                controller, stub,
2951                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2952                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2953            .call(), (resp2, err2) -> {
2954              if (err2 != null) {
2955                future.completeExceptionally(err2);
2956              } else {
2957                if (resp2.hasCompactionState()) {
2958                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2959                } else {
2960                  future.complete(CompactionState.NONE);
2961                }
2962              }
2963            });
2964        });
2965        break;
2966      case NORMAL:
2967        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2968          if (err != null) {
2969            future.completeExceptionally(err);
2970            return;
2971          }
2972          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2973          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2974          locations.stream().filter(loc -> loc.getServerName() != null)
2975            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2976            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2977              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2978                // If any region compaction state is MAJOR_AND_MINOR
2979                // the table compaction state is MAJOR_AND_MINOR, too.
2980                if (err2 != null) {
2981                  future.completeExceptionally(unwrapCompletionException(err2));
2982                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2983                  future.complete(regionState);
2984                } else {
2985                  regionStates.add(regionState);
2986                }
2987              }));
2988            });
2989          addListener(
2990            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2991            (ret, err3) -> {
2992              // If future not completed, check all regions's compaction state
2993              if (!future.isCompletedExceptionally() && !future.isDone()) {
2994                CompactionState state = CompactionState.NONE;
2995                for (CompactionState regionState : regionStates) {
2996                  switch (regionState) {
2997                    case MAJOR:
2998                      if (state == CompactionState.MINOR) {
2999                        future.complete(CompactionState.MAJOR_AND_MINOR);
3000                      } else {
3001                        state = CompactionState.MAJOR;
3002                      }
3003                      break;
3004                    case MINOR:
3005                      if (state == CompactionState.MAJOR) {
3006                        future.complete(CompactionState.MAJOR_AND_MINOR);
3007                      } else {
3008                        state = CompactionState.MINOR;
3009                      }
3010                      break;
3011                    case NONE:
3012                    default:
3013                  }
3014                }
3015                if (!future.isDone()) {
3016                  future.complete(state);
3017                }
3018              }
3019            });
3020        });
3021        break;
3022      default:
3023        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3024    }
3025
3026    return future;
3027  }
3028
3029  @Override
3030  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3031    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3032    addListener(getRegionLocation(regionName), (location, err) -> {
3033      if (err != null) {
3034        future.completeExceptionally(err);
3035        return;
3036      }
3037      ServerName serverName = location.getServerName();
3038      if (serverName == null) {
3039        future
3040          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3041        return;
3042      }
3043      addListener(
3044        this.<GetRegionInfoResponse> newAdminCaller()
3045          .action((controller, stub) -> this
3046            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3047              controller, stub,
3048              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3049                true),
3050              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3051          .serverName(serverName).call(),
3052        (resp2, err2) -> {
3053          if (err2 != null) {
3054            future.completeExceptionally(err2);
3055          } else {
3056            if (resp2.hasCompactionState()) {
3057              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3058            } else {
3059              future.complete(CompactionState.NONE);
3060            }
3061          }
3062        });
3063    });
3064    return future;
3065  }
3066
3067  @Override
3068  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3069    MajorCompactionTimestampRequest request =
3070        MajorCompactionTimestampRequest.newBuilder()
3071            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3072    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3073        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3074            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3075                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3076  }
3077
3078  @Override
3079  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3080      byte[] regionName) {
3081    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3082    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3083    addListener(getRegionInfo(regionName), (region, err) -> {
3084      if (err != null) {
3085        future.completeExceptionally(err);
3086        return;
3087      }
3088      MajorCompactionTimestampForRegionRequest.Builder builder =
3089        MajorCompactionTimestampForRegionRequest.newBuilder();
3090      builder.setRegion(
3091        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3092      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3093        .<MajorCompactionTimestampForRegionRequest,
3094        MajorCompactionTimestampResponse, Optional<Long>> call(
3095          controller, stub, builder.build(),
3096          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3097          ProtobufUtil::toOptionalTimestamp))
3098        .call(), (timestamp, err2) -> {
3099          if (err2 != null) {
3100            future.completeExceptionally(err2);
3101          } else {
3102            future.complete(timestamp);
3103          }
3104        });
3105    });
3106    return future;
3107  }
3108
3109  @Override
3110  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3111      List<String> serverNamesList) {
3112    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3113    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3114      if (err != null) {
3115        future.completeExceptionally(err);
3116        return;
3117      }
3118      // Accessed by multiple threads.
3119      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3120      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3121      serverNames.stream().forEach(serverName -> {
3122        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3123          if (err2 != null) {
3124            future.completeExceptionally(unwrapCompletionException(err2));
3125          } else {
3126            serverStates.put(serverName, serverState);
3127          }
3128        }));
3129      });
3130      addListener(
3131        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3132        (ret, err3) -> {
3133          if (!future.isCompletedExceptionally()) {
3134            if (err3 != null) {
3135              future.completeExceptionally(err3);
3136            } else {
3137              future.complete(serverStates);
3138            }
3139          }
3140        });
3141    });
3142    return future;
3143  }
3144
3145  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3146    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3147    if (serverNamesList.isEmpty()) {
3148      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3149        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3150      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3151        if (err != null) {
3152          future.completeExceptionally(err);
3153        } else {
3154          future.complete(clusterMetrics.getServersName());
3155        }
3156      });
3157      return future;
3158    } else {
3159      List<ServerName> serverList = new ArrayList<>();
3160      for (String regionServerName : serverNamesList) {
3161        ServerName serverName = null;
3162        try {
3163          serverName = ServerName.valueOf(regionServerName);
3164        } catch (Exception e) {
3165          future.completeExceptionally(
3166            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3167        }
3168        if (serverName == null) {
3169          future.completeExceptionally(
3170            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3171        } else {
3172          serverList.add(serverName);
3173        }
3174      }
3175      future.complete(serverList);
3176    }
3177    return future;
3178  }
3179
3180  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3181    return this
3182        .<Boolean>newAdminCaller()
3183        .serverName(serverName)
3184        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3185            Boolean>adminCall(controller, stub,
3186            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3187            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3188  }
3189
3190  @Override
3191  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3192    return this.<Boolean> newMasterCaller()
3193      .action((controller, stub) -> this
3194        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3195          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3196          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3197          (resp) -> resp.getPrevBalanceValue()))
3198      .call();
3199  }
3200
3201  @Override
3202  public CompletableFuture<Boolean> balance(boolean forcible) {
3203    return this
3204        .<Boolean> newMasterCaller()
3205        .action(
3206          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3207            stub, RequestConverter.buildBalanceRequest(forcible),
3208            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3209  }
3210
3211  @Override
3212  public CompletableFuture<Boolean> isBalancerEnabled() {
3213    return this
3214        .<Boolean> newMasterCaller()
3215        .action((controller, stub) ->
3216              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3217            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3218                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3219  }
3220
3221  @Override
3222  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3223    return this
3224        .<Boolean> newMasterCaller()
3225        .action(
3226          (controller, stub) -> this
3227              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3228                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3229                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3230                    .getPrevNormalizerValue())).call();
3231  }
3232
3233  @Override
3234  public CompletableFuture<Boolean> isNormalizerEnabled() {
3235    return this
3236        .<Boolean> newMasterCaller()
3237        .action(
3238          (controller, stub) -> this
3239              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3240                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3241                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3242                (resp) -> resp.getEnabled())).call();
3243  }
3244
3245  @Override
3246  public CompletableFuture<Boolean> normalize() {
3247    return this
3248        .<Boolean> newMasterCaller()
3249        .action(
3250          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3251            controller, stub, RequestConverter.buildNormalizeRequest(),
3252            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3253        .call();
3254  }
3255
3256  @Override
3257  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3258    return this
3259        .<Boolean> newMasterCaller()
3260        .action(
3261          (controller, stub) -> this
3262              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3263                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3264                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3265                    .getPrevValue())).call();
3266  }
3267
3268  @Override
3269  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3270    return this
3271        .<Boolean> newMasterCaller()
3272        .action(
3273          (controller, stub) -> this
3274              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3275                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3276                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3277        .call();
3278  }
3279
3280  @Override
3281  public CompletableFuture<Boolean> runCleanerChore() {
3282    return this
3283        .<Boolean> newMasterCaller()
3284        .action(
3285          (controller, stub) -> this
3286              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3287                RequestConverter.buildRunCleanerChoreRequest(),
3288                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3289                (resp) -> resp.getCleanerChoreRan())).call();
3290  }
3291
3292  @Override
3293  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3294    return this
3295        .<Boolean> newMasterCaller()
3296        .action(
3297          (controller, stub) -> this
3298              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3299                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3300                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3301                    .getPrevValue())).call();
3302  }
3303
3304  @Override
3305  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3306    return this
3307        .<Boolean> newMasterCaller()
3308        .action(
3309          (controller, stub) -> this
3310              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3311                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3312                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3313                    .getValue())).call();
3314  }
3315
3316  @Override
3317  public CompletableFuture<Integer> runCatalogJanitor() {
3318    return this
3319        .<Integer> newMasterCaller()
3320        .action(
3321          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3322            controller, stub, RequestConverter.buildCatalogScanRequest(),
3323            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3324        .call();
3325  }
3326
3327  @Override
3328  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3329      ServiceCaller<S, R> callable) {
3330    MasterCoprocessorRpcChannelImpl channel =
3331        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3332    S stub = stubMaker.apply(channel);
3333    CompletableFuture<R> future = new CompletableFuture<>();
3334    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3335    callable.call(stub, controller, resp -> {
3336      if (controller.failed()) {
3337        future.completeExceptionally(controller.getFailed());
3338      } else {
3339        future.complete(resp);
3340      }
3341    });
3342    return future;
3343  }
3344
3345  @Override
3346  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3347      ServiceCaller<S, R> callable, ServerName serverName) {
3348    RegionServerCoprocessorRpcChannelImpl channel =
3349        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3350          serverName));
3351    S stub = stubMaker.apply(channel);
3352    CompletableFuture<R> future = new CompletableFuture<>();
3353    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3354    callable.call(stub, controller, resp -> {
3355      if (controller.failed()) {
3356        future.completeExceptionally(controller.getFailed());
3357      } else {
3358        future.complete(resp);
3359      }
3360    });
3361    return future;
3362  }
3363
3364  @Override
3365  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3366    return this.<List<ServerName>> newMasterCaller()
3367      .action((controller, stub) -> this
3368        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3369          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3370          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3371          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3372      .call();
3373  }
3374
3375  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3376    return this.connection.callerFactory.<T> serverRequest()
3377      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3378      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3379      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3380      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3381  }
3382
3383  @Override
3384  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3385    if (tableName == null) {
3386      return failedFuture(new IllegalArgumentException("Table name is null"));
3387    }
3388    CompletableFuture<Void> future = new CompletableFuture<>();
3389    addListener(tableExists(tableName), (exist, err) -> {
3390      if (err != null) {
3391        future.completeExceptionally(err);
3392        return;
3393      }
3394      if (!exist) {
3395        future.completeExceptionally(new TableNotFoundException(
3396          "Table '" + tableName.getNameAsString() + "' does not exists."));
3397        return;
3398      }
3399      addListener(getTableSplits(tableName), (splits, err1) -> {
3400        if (err1 != null) {
3401          future.completeExceptionally(err1);
3402        } else {
3403          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3404            if (err2 != null) {
3405              future.completeExceptionally(err2);
3406            } else {
3407              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3408                if (err3 != null) {
3409                  future.completeExceptionally(err3);
3410                } else {
3411                  future.complete(result3);
3412                }
3413              });
3414            }
3415          });
3416        }
3417      });
3418    });
3419    return future;
3420  }
3421
3422  @Override
3423  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3424    if (tableName == null) {
3425      return failedFuture(new IllegalArgumentException("Table name is null"));
3426    }
3427    CompletableFuture<Void> future = new CompletableFuture<>();
3428    addListener(tableExists(tableName), (exist, err) -> {
3429      if (err != null) {
3430        future.completeExceptionally(err);
3431        return;
3432      }
3433      if (!exist) {
3434        future.completeExceptionally(new TableNotFoundException(
3435          "Table '" + tableName.getNameAsString() + "' does not exists."));
3436        return;
3437      }
3438      addListener(setTableReplication(tableName, false), (result, err2) -> {
3439        if (err2 != null) {
3440          future.completeExceptionally(err2);
3441        } else {
3442          future.complete(result);
3443        }
3444      });
3445    });
3446    return future;
3447  }
3448
3449  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3450    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3451    addListener(
3452      getRegions(tableName).thenApply(regions -> regions.stream()
3453        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3454      (regions, err2) -> {
3455        if (err2 != null) {
3456          future.completeExceptionally(err2);
3457          return;
3458        }
3459        if (regions.size() == 1) {
3460          future.complete(null);
3461        } else {
3462          byte[][] splits = new byte[regions.size() - 1][];
3463          for (int i = 1; i < regions.size(); i++) {
3464            splits[i - 1] = regions.get(i).getStartKey();
3465          }
3466          future.complete(splits);
3467        }
3468      });
3469    return future;
3470  }
3471
3472  /**
3473   * Connect to peer and check the table descriptor on peer:
3474   * <ol>
3475   * <li>Create the same table on peer when not exist.</li>
3476   * <li>Throw an exception if the table already has replication enabled on any of the column
3477   * families.</li>
3478   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3479   * </ol>
3480   * @param tableName name of the table to sync to the peer
3481   * @param splits table split keys
3482   */
3483  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3484      byte[][] splits) {
3485    CompletableFuture<Void> future = new CompletableFuture<>();
3486    addListener(listReplicationPeers(), (peers, err) -> {
3487      if (err != null) {
3488        future.completeExceptionally(err);
3489        return;
3490      }
3491      if (peers == null || peers.size() <= 0) {
3492        future.completeExceptionally(
3493          new IllegalArgumentException("Found no peer cluster for replication."));
3494        return;
3495      }
3496      List<CompletableFuture<Void>> futures = new ArrayList<>();
3497      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3498        .forEach(peer -> {
3499          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3500        });
3501      addListener(
3502        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3503        (result, err2) -> {
3504          if (err2 != null) {
3505            future.completeExceptionally(err2);
3506          } else {
3507            future.complete(result);
3508          }
3509        });
3510    });
3511    return future;
3512  }
3513
3514  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3515      ReplicationPeerDescription peer) {
3516    Configuration peerConf = null;
3517    try {
3518      peerConf =
3519        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3520    } catch (IOException e) {
3521      return failedFuture(e);
3522    }
3523    CompletableFuture<Void> future = new CompletableFuture<>();
3524    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3525      if (err != null) {
3526        future.completeExceptionally(err);
3527        return;
3528      }
3529      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3530        if (err1 != null) {
3531          future.completeExceptionally(err1);
3532          return;
3533        }
3534        AsyncAdmin peerAdmin = conn.getAdmin();
3535        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3536          if (err2 != null) {
3537            future.completeExceptionally(err2);
3538            return;
3539          }
3540          if (!exist) {
3541            CompletableFuture<Void> createTableFuture = null;
3542            if (splits == null) {
3543              createTableFuture = peerAdmin.createTable(tableDesc);
3544            } else {
3545              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3546            }
3547            addListener(createTableFuture, (result, err3) -> {
3548              if (err3 != null) {
3549                future.completeExceptionally(err3);
3550              } else {
3551                future.complete(result);
3552              }
3553            });
3554          } else {
3555            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3556              (result, err4) -> {
3557                if (err4 != null) {
3558                  future.completeExceptionally(err4);
3559                } else {
3560                  future.complete(result);
3561                }
3562              });
3563          }
3564        });
3565      });
3566    });
3567    return future;
3568  }
3569
3570  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3571      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3572    CompletableFuture<Void> future = new CompletableFuture<>();
3573    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3574      if (err != null) {
3575        future.completeExceptionally(err);
3576        return;
3577      }
3578      if (peerTableDesc == null) {
3579        future.completeExceptionally(
3580          new IllegalArgumentException("Failed to get table descriptor for table " +
3581            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3582        return;
3583      }
3584      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3585        future.completeExceptionally(new IllegalArgumentException(
3586          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3587            ", but the table descriptors are not same when compared with source cluster." +
3588            " Thus can not enable the table's replication switch."));
3589        return;
3590      }
3591      future.complete(null);
3592    });
3593    return future;
3594  }
3595
3596  /**
3597   * Set the table's replication switch if the table's replication switch is already not set.
3598   * @param tableName name of the table
3599   * @param enableRep is replication switch enable or disable
3600   */
3601  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3602    CompletableFuture<Void> future = new CompletableFuture<>();
3603    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3604      if (err != null) {
3605        future.completeExceptionally(err);
3606        return;
3607      }
3608      if (!tableDesc.matchReplicationScope(enableRep)) {
3609        int scope =
3610          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3611        TableDescriptor newTableDesc =
3612          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3613        addListener(modifyTable(newTableDesc), (result, err2) -> {
3614          if (err2 != null) {
3615            future.completeExceptionally(err2);
3616          } else {
3617            future.complete(result);
3618          }
3619        });
3620      } else {
3621        future.complete(null);
3622      }
3623    });
3624    return future;
3625  }
3626
3627  @Override
3628  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3629    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3630    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3631      if (err != null) {
3632        future.completeExceptionally(err);
3633        return;
3634      }
3635      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3636        locations.stream().filter(l -> l.getRegion() != null)
3637          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3638          .collect(Collectors.groupingBy(l -> l.getServerName(),
3639            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3640      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3641      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3642      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3643        futures
3644          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3645            if (err2 != null) {
3646              future.completeExceptionally(unwrapCompletionException(err2));
3647            } else {
3648              aggregator.append(stats);
3649            }
3650          }));
3651      }
3652      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3653        (ret, err3) -> {
3654          if (err3 != null) {
3655            future.completeExceptionally(unwrapCompletionException(err3));
3656          } else {
3657            future.complete(aggregator.sum());
3658          }
3659        });
3660    });
3661    return future;
3662  }
3663
3664  @Override
3665  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3666      boolean preserveSplits) {
3667    CompletableFuture<Void> future = new CompletableFuture<>();
3668    addListener(tableExists(tableName), (exist, err) -> {
3669      if (err != null) {
3670        future.completeExceptionally(err);
3671        return;
3672      }
3673      if (!exist) {
3674        future.completeExceptionally(new TableNotFoundException(tableName));
3675        return;
3676      }
3677      addListener(tableExists(newTableName), (exist1, err1) -> {
3678        if (err1 != null) {
3679          future.completeExceptionally(err1);
3680          return;
3681        }
3682        if (exist1) {
3683          future.completeExceptionally(new TableExistsException(newTableName));
3684          return;
3685        }
3686        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3687          if (err2 != null) {
3688            future.completeExceptionally(err2);
3689            return;
3690          }
3691          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3692          if (preserveSplits) {
3693            addListener(getTableSplits(tableName), (splits, err3) -> {
3694              if (err3 != null) {
3695                future.completeExceptionally(err3);
3696              } else {
3697                addListener(
3698                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3699                  (result, err4) -> {
3700                    if (err4 != null) {
3701                      future.completeExceptionally(err4);
3702                    } else {
3703                      future.complete(result);
3704                    }
3705                  });
3706              }
3707            });
3708          } else {
3709            addListener(createTable(newTableDesc), (result, err5) -> {
3710              if (err5 != null) {
3711                future.completeExceptionally(err5);
3712              } else {
3713                future.complete(result);
3714              }
3715            });
3716          }
3717        });
3718      });
3719    });
3720    return future;
3721  }
3722
3723  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3724      List<RegionInfo> hris) {
3725    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3726      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3727        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3728        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3729        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3730      .serverName(serverName).call();
3731  }
3732
3733  @Override
3734  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3735    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3736        .action((controller, stub) -> this
3737            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3738              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3739              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3740              resp -> resp.getPreviousRpcThrottleEnabled()))
3741        .call();
3742    return future;
3743  }
3744
3745  @Override
3746  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3747    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3748        .action((controller, stub) -> this
3749            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3750              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3751              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3752              resp -> resp.getRpcThrottleEnabled()))
3753        .call();
3754    return future;
3755  }
3756
3757  @Override
3758  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3759    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3760        .action((controller, stub) -> this
3761            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3762              controller, stub,
3763              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3764                  .build(),
3765              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3766              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3767        .call();
3768    return future;
3769  }
3770
3771  @Override
3772  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3773    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3774      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3775      Map<TableName, Long>> call(controller, stub,
3776        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3777        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3778        resp -> resp.getSizesList().stream().collect(Collectors
3779          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3780      .call();
3781  }
3782
3783  @Override
3784  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3785      ServerName serverName) {
3786    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3787      .action((controller, stub) -> this
3788        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3789        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3790          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3791          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3792          resp -> resp.getSnapshotsList().stream()
3793            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3794              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3795      .serverName(serverName).call();
3796  }
3797
3798  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3799      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3800    return this.<SpaceQuotaSnapshot> newMasterCaller()
3801      .action((controller, stub) -> this
3802        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3803          RequestConverter.buildGetQuotaStatesRequest(),
3804          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3805      .call();
3806  }
3807
3808  @Override
3809  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3810    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3811      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3812      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3813  }
3814
3815  @Override
3816  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3817    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3818    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3819      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3820      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3821  }
3822
3823  @Override
3824  public CompletableFuture<Void> grant(UserPermission userPermission,
3825      boolean mergeExistingPermissions) {
3826    return this.<Void> newMasterCaller()
3827        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3828          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3829          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3830        .call();
3831  }
3832
3833  @Override
3834  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3835    return this.<Void> newMasterCaller()
3836        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3837          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3838          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3839        .call();
3840  }
3841
3842  @Override
3843  public CompletableFuture<List<UserPermission>>
3844      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3845    return this.<List<UserPermission>> newMasterCaller().action((controller,
3846        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3847            List<UserPermission>> call(controller, stub,
3848              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3849              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3850              resp -> resp.getUserPermissionList().stream()
3851                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3852                .collect(Collectors.toList())))
3853        .call();
3854  }
3855
3856  @Override
3857  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3858      List<Permission> permissions) {
3859    return this.<List<Boolean>> newMasterCaller()
3860        .action((controller, stub) -> this
3861            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3862              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3863              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3864              resp -> resp.getHasUserPermissionList()))
3865        .call();
3866  }
3867
3868  @Override
3869  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3870      final boolean sync) {
3871    return this.<Boolean>newMasterCaller()
3872        .action((controller, stub) -> this
3873            .call(controller, stub,
3874                RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3875                MasterService.Interface::switchSnapshotCleanup,
3876                SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
3877        .call();
3878  }
3879
3880  @Override
3881  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3882    return this.<Boolean>newMasterCaller()
3883        .action((controller, stub) -> this
3884            .call(controller, stub,
3885                RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3886                MasterService.Interface::isSnapshotCleanupEnabled,
3887                IsSnapshotCleanupEnabledResponse::getEnabled))
3888        .call();
3889  }
3890
3891  @Override
3892  public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
3893      @Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
3894    if (CollectionUtils.isEmpty(serverNames)) {
3895      return CompletableFuture.completedFuture(Collections.emptyList());
3896    }
3897    if (logQueryFilter.getType() == null
3898      || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
3899      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3900        .map((ServerName serverName) ->
3901          getSlowLogResponseFromServer(serverName, logQueryFilter))
3902        .map(CompletableFuture::join)
3903        .flatMap(List::stream)
3904        .collect(Collectors.toList()));
3905    } else {
3906      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3907        .map((ServerName serverName) ->
3908          getLargeLogResponseFromServer(serverName, logQueryFilter))
3909        .map(CompletableFuture::join)
3910        .flatMap(List::stream)
3911        .collect(Collectors.toList()));
3912    }
3913  }
3914
3915  private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer(
3916      final ServerName serverName, final LogQueryFilter logQueryFilter) {
3917    return this.<List<OnlineLogRecord>>newAdminCaller()
3918      .action((controller, stub) -> this
3919        .adminCall(
3920          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
3921          AdminService.Interface::getLargeLogResponses,
3922          ProtobufUtil::toSlowLogPayloads))
3923      .serverName(serverName).call();
3924  }
3925
3926  private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer(
3927    final ServerName serverName, final LogQueryFilter logQueryFilter) {
3928    return this.<List<OnlineLogRecord>>newAdminCaller()
3929      .action((controller, stub) -> this
3930        .adminCall(
3931          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
3932          AdminService.Interface::getSlowLogResponses,
3933          ProtobufUtil::toSlowLogPayloads))
3934      .serverName(serverName).call();
3935  }
3936
3937  @Override
3938  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
3939      @Nullable Set<ServerName> serverNames) {
3940    if (CollectionUtils.isEmpty(serverNames)) {
3941      return CompletableFuture.completedFuture(Collections.emptyList());
3942    }
3943    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
3944      .map(this::clearSlowLogsResponses)
3945      .collect(Collectors.toList());
3946    return convertToFutureOfList(clearSlowLogResponseList);
3947  }
3948
3949  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
3950    return this.<Boolean>newAdminCaller()
3951      .action(((controller, stub) -> this
3952        .adminCall(
3953          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
3954          AdminService.Interface::clearSlowLogsResponses,
3955          ProtobufUtil::toClearSlowLogPayload))
3956      ).serverName(serverName).call();
3957  }
3958
3959  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
3960    List<CompletableFuture<T>> futures) {
3961    CompletableFuture<Void> allDoneFuture =
3962      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
3963    return allDoneFuture.thenApply(v ->
3964      futures.stream()
3965        .map(CompletableFuture::join)
3966        .collect(Collectors.toList())
3967    );
3968  }
3969
3970}