001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcChannel;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.replication.SyncReplicationState;
089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
090import org.apache.hadoop.hbase.security.access.Permission;
091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
092import org.apache.hadoop.hbase.security.access.UserPermission;
093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.util.Bytes;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
107import org.apache.hbase.thirdparty.io.netty.util.Timeout;
108import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
109
110import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
111import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
301
302/**
303 * The implementation of AsyncAdmin.
304 * <p>
305 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
306 * be finished inside the rpc framework thread, which means that the callbacks registered to the
307 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
308 * this class should not try to do time consuming tasks in the callbacks.
309 * @since 2.0.0
310 * @see AsyncHBaseAdmin
311 * @see AsyncConnection#getAdmin()
312 * @see AsyncConnection#getAdminBuilder()
313 */
314@InterfaceAudience.Private
315class RawAsyncHBaseAdmin implements AsyncAdmin {
316  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
317
318  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
319
320  private final AsyncConnectionImpl connection;
321
322  private final HashedWheelTimer retryTimer;
323
324  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
325
326  private final long rpcTimeoutNs;
327
328  private final long operationTimeoutNs;
329
330  private final long pauseNs;
331
332  private final long pauseForCQTBENs;
333
334  private final int maxAttempts;
335
336  private final int startLogErrorsCnt;
337
338  private final NonceGenerator ng;
339
340  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
341      AsyncAdminBuilderBase builder) {
342    this.connection = connection;
343    this.retryTimer = retryTimer;
344    this.metaTable = connection.getTable(META_TABLE_NAME);
345    this.rpcTimeoutNs = builder.rpcTimeoutNs;
346    this.operationTimeoutNs = builder.operationTimeoutNs;
347    this.pauseNs = builder.pauseNs;
348    if (builder.pauseForCQTBENs < builder.pauseNs) {
349      LOG.warn(
350        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
351          " the normal pause value {} ms, use the greater one instead",
352        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
353        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
354      this.pauseForCQTBENs = builder.pauseNs;
355    } else {
356      this.pauseForCQTBENs = builder.pauseForCQTBENs;
357    }
358    this.maxAttempts = builder.maxAttempts;
359    this.startLogErrorsCnt = builder.startLogErrorsCnt;
360    this.ng = connection.getNonceGenerator();
361  }
362
363  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
364    return this.connection.callerFactory.<T> masterRequest()
365      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
366      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
367      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
368      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
369  }
370
371  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
372    return this.connection.callerFactory.<T> adminRequest()
373      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
374      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
375      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
376      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
377  }
378
379  @FunctionalInterface
380  private interface MasterRpcCall<RESP, REQ> {
381    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
382        RpcCallback<RESP> done);
383  }
384
385  @FunctionalInterface
386  private interface AdminRpcCall<RESP, REQ> {
387    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
388        RpcCallback<RESP> done);
389  }
390
391  @FunctionalInterface
392  private interface Converter<D, S> {
393    D convert(S src) throws IOException;
394  }
395
396  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
397      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
398      Converter<RESP, PRESP> respConverter) {
399    CompletableFuture<RESP> future = new CompletableFuture<>();
400    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
401
402      @Override
403      public void run(PRESP resp) {
404        if (controller.failed()) {
405          future.completeExceptionally(controller.getFailed());
406        } else {
407          try {
408            future.complete(respConverter.convert(resp));
409          } catch (IOException e) {
410            future.completeExceptionally(e);
411          }
412        }
413      }
414    });
415    return future;
416  }
417
418  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
419      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
420      Converter<RESP, PRESP> respConverter) {
421    CompletableFuture<RESP> future = new CompletableFuture<>();
422    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
423
424      @Override
425      public void run(PRESP resp) {
426        if (controller.failed()) {
427          future.completeExceptionally(controller.getFailed());
428        } else {
429          try {
430            future.complete(respConverter.convert(resp));
431          } catch (IOException e) {
432            future.completeExceptionally(e);
433          }
434        }
435      }
436    });
437    return future;
438  }
439
440  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
441      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
442      ProcedureBiConsumer consumer) {
443    return procedureCall(b -> {
444    }, preq, rpcCall, respConverter, consumer);
445  }
446
447  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
448      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
449      ProcedureBiConsumer consumer) {
450    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
451  }
452
453  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
454      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
455      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
456      ProcedureBiConsumer consumer) {
457    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
458        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
459    prioritySetter.accept(builder);
460    CompletableFuture<Long> procFuture = builder.call();
461    CompletableFuture<Void> future = waitProcedureResult(procFuture);
462    addListener(future, consumer);
463    return future;
464  }
465
466  @FunctionalInterface
467  private interface TableOperator {
468    CompletableFuture<Void> operate(TableName table);
469  }
470
471  @Override
472  public CompletableFuture<Boolean> tableExists(TableName tableName) {
473    if (TableName.isMetaTableName(tableName)) {
474      return CompletableFuture.completedFuture(true);
475    }
476    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
477  }
478
479  @Override
480  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
481    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
482      includeSysTables));
483  }
484
485  /**
486   * {@link #listTableDescriptors(boolean)}
487   */
488  @Override
489  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
490      boolean includeSysTables) {
491    Preconditions.checkNotNull(pattern,
492      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
493    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
494      includeSysTables));
495  }
496
497  @Override
498  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
499    Preconditions.checkNotNull(tableNames,
500      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
501    if (tableNames.isEmpty()) {
502      return CompletableFuture.completedFuture(Collections.emptyList());
503    }
504    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
505  }
506
507  private CompletableFuture<List<TableDescriptor>>
508      getTableDescriptors(GetTableDescriptorsRequest request) {
509    return this.<List<TableDescriptor>> newMasterCaller()
510        .action((controller, stub) -> this
511            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
512              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
513              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
514        .call();
515  }
516
517  @Override
518  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
519    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
520  }
521
522  @Override
523  public CompletableFuture<List<TableName>>
524      listTableNames(Pattern pattern, boolean includeSysTables) {
525    Preconditions.checkNotNull(pattern,
526        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
527    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
528  }
529
530  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
531    return this
532        .<List<TableName>> newMasterCaller()
533        .action(
534          (controller, stub) -> this
535              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
536                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
537                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
538  }
539
540  @Override
541  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
542    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
543        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
544        List<TableDescriptor>> call(
545          controller, stub,
546          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
547          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
548          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
549        .call();
550  }
551
552  @Override
553  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
554    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
555        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
556        List<TableName>> call(
557          controller, stub,
558          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
559          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
560          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
561        .call();
562  }
563
564  @Override
565  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
566    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
567    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
568      .action((controller, stub) -> this
569        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
570          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
571          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
572          (resp) -> resp.getTableSchemaList()))
573      .call(), (tableSchemas, error) -> {
574        if (error != null) {
575          future.completeExceptionally(error);
576          return;
577        }
578        if (!tableSchemas.isEmpty()) {
579          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
580        } else {
581          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
582        }
583      });
584    return future;
585  }
586
587  @Override
588  public CompletableFuture<Void> createTable(TableDescriptor desc) {
589    return createTable(desc.getTableName(),
590      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
591  }
592
593  @Override
594  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
595      int numRegions) {
596    try {
597      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
598    } catch (IllegalArgumentException e) {
599      return failedFuture(e);
600    }
601  }
602
603  @Override
604  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
605    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
606        + " use createTable(TableDescriptor) instead");
607    try {
608      verifySplitKeys(splitKeys);
609      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
610        splitKeys, ng.getNonceGroup(), ng.newNonce()));
611    } catch (IllegalArgumentException e) {
612      return failedFuture(e);
613    }
614  }
615
616  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
617    Preconditions.checkNotNull(tableName, "table name is null");
618    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
619      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
620      new CreateTableProcedureBiConsumer(tableName));
621  }
622
623  @Override
624  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
625    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
626      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
627        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
628      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
629  }
630
631  @Override
632  public CompletableFuture<Void> deleteTable(TableName tableName) {
633    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
634      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
635      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
636      new DeleteTableProcedureBiConsumer(tableName));
637  }
638
639  @Override
640  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
641    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
642      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
643        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
644      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
645  }
646
647  @Override
648  public CompletableFuture<Void> enableTable(TableName tableName) {
649    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
650      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
651      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
652      new EnableTableProcedureBiConsumer(tableName));
653  }
654
655  @Override
656  public CompletableFuture<Void> disableTable(TableName tableName) {
657    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
658      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
659      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
660      new DisableTableProcedureBiConsumer(tableName));
661  }
662
663  @Override
664  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
665    if (TableName.isMetaTableName(tableName)) {
666      return CompletableFuture.completedFuture(true);
667    }
668    CompletableFuture<Boolean> future = new CompletableFuture<>();
669    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
670      if (error != null) {
671        future.completeExceptionally(error);
672        return;
673      }
674      if (state.isPresent()) {
675        future.complete(state.get().inStates(TableState.State.ENABLED));
676      } else {
677        future.completeExceptionally(new TableNotFoundException(tableName));
678      }
679    });
680    return future;
681  }
682
683  @Override
684  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
685    if (TableName.isMetaTableName(tableName)) {
686      return CompletableFuture.completedFuture(false);
687    }
688    CompletableFuture<Boolean> future = new CompletableFuture<>();
689    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
690      if (error != null) {
691        future.completeExceptionally(error);
692        return;
693      }
694      if (state.isPresent()) {
695        future.complete(state.get().inStates(TableState.State.DISABLED));
696      } else {
697        future.completeExceptionally(new TableNotFoundException(tableName));
698      }
699    });
700    return future;
701  }
702
703  @Override
704  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
705    if (TableName.isMetaTableName(tableName)) {
706      return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
707        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
708    }
709    CompletableFuture<Boolean> future = new CompletableFuture<>();
710    addListener(isTableEnabled(tableName), (enabled, error) -> {
711      if (error != null) {
712        if (error instanceof TableNotFoundException) {
713          future.complete(false);
714        } else {
715          future.completeExceptionally(error);
716        }
717        return;
718      }
719      if (!enabled) {
720        future.complete(false);
721      } else {
722        addListener(
723          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
724          (locations, error1) -> {
725            if (error1 != null) {
726              future.completeExceptionally(error1);
727              return;
728            }
729            List<HRegionLocation> notDeployedRegions = locations.stream()
730              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
731            if (notDeployedRegions.size() > 0) {
732              if (LOG.isDebugEnabled()) {
733                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
734              }
735              future.complete(false);
736              return;
737            }
738            future.complete(true);
739          });
740      }
741    });
742    return future;
743  }
744
745  @Override
746  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
747    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
748      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
749        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
750      new AddColumnFamilyProcedureBiConsumer(tableName));
751  }
752
753  @Override
754  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
755    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
756      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
757        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
758      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
759  }
760
761  @Override
762  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
763      ColumnFamilyDescriptor columnFamily) {
764    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
765      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
766        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
767      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
768  }
769
770  @Override
771  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
772    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
773      RequestConverter.buildCreateNamespaceRequest(descriptor),
774      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
775      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
776  }
777
778  @Override
779  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
780    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
781      RequestConverter.buildModifyNamespaceRequest(descriptor),
782      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
783      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
784  }
785
786  @Override
787  public CompletableFuture<Void> deleteNamespace(String name) {
788    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
789      RequestConverter.buildDeleteNamespaceRequest(name),
790      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
791      new DeleteNamespaceProcedureBiConsumer(name));
792  }
793
794  @Override
795  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
796    return this
797        .<NamespaceDescriptor> newMasterCaller()
798        .action(
799          (controller, stub) -> this
800              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
801                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
802                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
803                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
804  }
805
806  @Override
807  public CompletableFuture<List<String>> listNamespaces() {
808    return this
809        .<List<String>> newMasterCaller()
810        .action(
811          (controller, stub) -> this
812              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
813                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
814                  done) -> s.listNamespaces(c, req, done),
815                (resp) -> resp.getNamespaceNameList())).call();
816  }
817
818  @Override
819  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
820    return this
821        .<List<NamespaceDescriptor>> newMasterCaller()
822        .action(
823          (controller, stub) -> this
824              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
825                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
826                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
827                    .toNamespaceDescriptorList(resp))).call();
828  }
829
830  @Override
831  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
832    return this.<List<RegionInfo>> newAdminCaller()
833        .action((controller, stub) -> this
834            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
835              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
836              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
837              resp -> ProtobufUtil.getRegionInfos(resp)))
838        .serverName(serverName).call();
839  }
840
841  @Override
842  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
843    if (tableName.equals(META_TABLE_NAME)) {
844      return connection.registry.getMetaRegionLocation()
845        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
846          .collect(Collectors.toList()));
847    } else {
848      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
849        .thenApply(
850          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
851    }
852  }
853
854  @Override
855  public CompletableFuture<Void> flush(TableName tableName) {
856    CompletableFuture<Void> future = new CompletableFuture<>();
857    addListener(tableExists(tableName), (exists, err) -> {
858      if (err != null) {
859        future.completeExceptionally(err);
860      } else if (!exists) {
861        future.completeExceptionally(new TableNotFoundException(tableName));
862      } else {
863        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
864          if (err2 != null) {
865            future.completeExceptionally(err2);
866          } else if (!tableEnabled) {
867            future.completeExceptionally(new TableNotEnabledException(tableName));
868          } else {
869            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
870              new HashMap<>()), (ret, err3) -> {
871                if (err3 != null) {
872                  future.completeExceptionally(err3);
873                } else {
874                  future.complete(ret);
875                }
876              });
877          }
878        });
879      }
880    });
881    return future;
882  }
883
884  @Override
885  public CompletableFuture<Void> flushRegion(byte[] regionName) {
886    return flushRegionInternal(regionName, false).thenAccept(r -> {
887    });
888  }
889
890  /**
891   * This method is for internal use only, where we need the response of the flush.
892   * <p/>
893   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
894   * API.
895   */
896  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
897      boolean writeFlushWALMarker) {
898    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
899    addListener(getRegionLocation(regionName), (location, err) -> {
900      if (err != null) {
901        future.completeExceptionally(err);
902        return;
903      }
904      ServerName serverName = location.getServerName();
905      if (serverName == null) {
906        future
907          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
908        return;
909      }
910      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
911        if (err2 != null) {
912          future.completeExceptionally(err2);
913        } else {
914          future.complete(ret);
915        }
916      });
917    });
918    return future;
919  }
920
921  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
922      boolean writeFlushWALMarker) {
923    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
924      .action((controller, stub) -> this
925        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
926          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
927          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
928      .call();
929  }
930
931  @Override
932  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
933    CompletableFuture<Void> future = new CompletableFuture<>();
934    addListener(getRegions(sn), (hRegionInfos, err) -> {
935      if (err != null) {
936        future.completeExceptionally(err);
937        return;
938      }
939      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
940      if (hRegionInfos != null) {
941        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
942        })));
943      }
944      addListener(CompletableFuture.allOf(
945        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
946          if (err2 != null) {
947            future.completeExceptionally(err2);
948          } else {
949            future.complete(ret);
950          }
951        });
952    });
953    return future;
954  }
955
956  @Override
957  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
958    return compact(tableName, null, false, compactType);
959  }
960
961  @Override
962  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
963      CompactType compactType) {
964    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
965        + "If you don't specify a columnFamily, use compact(TableName) instead");
966    return compact(tableName, columnFamily, false, compactType);
967  }
968
969  @Override
970  public CompletableFuture<Void> compactRegion(byte[] regionName) {
971    return compactRegion(regionName, null, false);
972  }
973
974  @Override
975  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
976    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
977        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
978    return compactRegion(regionName, columnFamily, false);
979  }
980
981  @Override
982  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
983    return compact(tableName, null, true, compactType);
984  }
985
986  @Override
987  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
988      CompactType compactType) {
989    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
990        + "If you don't specify a columnFamily, use compact(TableName) instead");
991    return compact(tableName, columnFamily, true, compactType);
992  }
993
994  @Override
995  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
996    return compactRegion(regionName, null, true);
997  }
998
999  @Override
1000  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1001    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1002        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1003    return compactRegion(regionName, columnFamily, true);
1004  }
1005
1006  @Override
1007  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1008    return compactRegionServer(sn, false);
1009  }
1010
1011  @Override
1012  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1013    return compactRegionServer(sn, true);
1014  }
1015
1016  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1017    CompletableFuture<Void> future = new CompletableFuture<>();
1018    addListener(getRegions(sn), (hRegionInfos, err) -> {
1019      if (err != null) {
1020        future.completeExceptionally(err);
1021        return;
1022      }
1023      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1024      if (hRegionInfos != null) {
1025        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1026      }
1027      addListener(CompletableFuture.allOf(
1028        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1029          if (err2 != null) {
1030            future.completeExceptionally(err2);
1031          } else {
1032            future.complete(ret);
1033          }
1034        });
1035    });
1036    return future;
1037  }
1038
1039  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1040      boolean major) {
1041    CompletableFuture<Void> future = new CompletableFuture<>();
1042    addListener(getRegionLocation(regionName), (location, err) -> {
1043      if (err != null) {
1044        future.completeExceptionally(err);
1045        return;
1046      }
1047      ServerName serverName = location.getServerName();
1048      if (serverName == null) {
1049        future
1050          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1051        return;
1052      }
1053      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1054        (ret, err2) -> {
1055          if (err2 != null) {
1056            future.completeExceptionally(err2);
1057          } else {
1058            future.complete(ret);
1059          }
1060        });
1061    });
1062    return future;
1063  }
1064
1065  /**
1066   * List all region locations for the specific table.
1067   */
1068  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1069    if (TableName.META_TABLE_NAME.equals(tableName)) {
1070      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1071      // For meta table, we use zk to fetch all locations.
1072      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1073      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1074        if (err != null) {
1075          future.completeExceptionally(err);
1076        } else if (metaRegions == null || metaRegions.isEmpty() ||
1077          metaRegions.getDefaultRegionLocation() == null) {
1078          future.completeExceptionally(new IOException("meta region does not found"));
1079        } else {
1080          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1081        }
1082        // close the registry.
1083        IOUtils.closeQuietly(registry);
1084      });
1085      return future;
1086    } else {
1087      // For non-meta table, we fetch all locations by scanning hbase:meta table
1088      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
1089    }
1090  }
1091
1092  /**
1093   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1094   */
1095  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1096      CompactType compactType) {
1097    CompletableFuture<Void> future = new CompletableFuture<>();
1098
1099    switch (compactType) {
1100      case MOB:
1101        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1102          if (err != null) {
1103            future.completeExceptionally(err);
1104            return;
1105          }
1106          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1107          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1108            if (err2 != null) {
1109              future.completeExceptionally(err2);
1110            } else {
1111              future.complete(ret);
1112            }
1113          });
1114        });
1115        break;
1116      case NORMAL:
1117        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1118          if (err != null) {
1119            future.completeExceptionally(err);
1120            return;
1121          }
1122          if (locations == null || locations.isEmpty()) {
1123            future.completeExceptionally(new TableNotFoundException(tableName));
1124          }
1125          CompletableFuture<?>[] compactFutures =
1126            locations.stream().filter(l -> l.getRegion() != null)
1127              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1128              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1129              .toArray(CompletableFuture<?>[]::new);
1130          // future complete unless all of the compact futures are completed.
1131          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1132            if (err2 != null) {
1133              future.completeExceptionally(err2);
1134            } else {
1135              future.complete(ret);
1136            }
1137          });
1138        });
1139        break;
1140      default:
1141        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1142    }
1143    return future;
1144  }
1145
1146  /**
1147   * Compact the region at specific region server.
1148   */
1149  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1150      final boolean major, byte[] columnFamily) {
1151    return this
1152        .<Void> newAdminCaller()
1153        .serverName(sn)
1154        .action(
1155          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1156            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1157              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1158            resp -> null)).call();
1159  }
1160
1161  private byte[] toEncodeRegionName(byte[] regionName) {
1162    try {
1163      return RegionInfo.isEncodedRegionName(regionName) ? regionName
1164          : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1165    } catch (IOException e) {
1166      return regionName;
1167    }
1168  }
1169
1170  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1171      CompletableFuture<TableName> result) {
1172    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1173      if (err != null) {
1174        result.completeExceptionally(err);
1175        return;
1176      }
1177      RegionInfo regionInfo = location.getRegion();
1178      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1179        result.completeExceptionally(
1180          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1181        return;
1182      }
1183      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1184        if (!tableName.get().equals(regionInfo.getTable())) {
1185          // tables of this two region should be same.
1186          result.completeExceptionally(
1187            new IllegalArgumentException("Cannot merge regions from two different tables " +
1188              tableName.get() + " and " + regionInfo.getTable()));
1189        } else {
1190          result.complete(tableName.get());
1191        }
1192      }
1193    });
1194  }
1195
1196  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1197    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1198    CompletableFuture<TableName> future = new CompletableFuture<>();
1199    for (byte[] encodedRegionName : encodedRegionNames) {
1200      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1201    }
1202    return future;
1203  }
1204
1205  @Override
1206  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1207    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1208  }
1209
1210  @Override
1211  public CompletableFuture<Boolean> isMergeEnabled() {
1212    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1213  }
1214
1215  @Override
1216  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1217    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1218  }
1219
1220  @Override
1221  public CompletableFuture<Boolean> isSplitEnabled() {
1222    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1223  }
1224
1225  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1226      MasterSwitchType switchType) {
1227    SetSplitOrMergeEnabledRequest request =
1228      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1229    return this.<Boolean> newMasterCaller()
1230      .action((controller, stub) -> this
1231        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1232          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1233          (resp) -> resp.getPrevValueList().get(0)))
1234      .call();
1235  }
1236
1237  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1238    IsSplitOrMergeEnabledRequest request =
1239        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1240    return this
1241        .<Boolean> newMasterCaller()
1242        .action(
1243          (controller, stub) -> this
1244              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1245                controller, stub, request,
1246                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1247                (resp) -> resp.getEnabled())).call();
1248  }
1249
1250  @Override
1251  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1252    if (nameOfRegionsToMerge.size() < 2) {
1253      return failedFuture(new IllegalArgumentException(
1254        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1255    }
1256    CompletableFuture<Void> future = new CompletableFuture<>();
1257    byte[][] encodedNameOfRegionsToMerge =
1258      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1259
1260    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1261      if (err != null) {
1262        future.completeExceptionally(err);
1263        return;
1264      }
1265
1266      MergeTableRegionsRequest request = null;
1267      try {
1268        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1269          forcible, ng.getNonceGroup(), ng.newNonce());
1270      } catch (DeserializationException e) {
1271        future.completeExceptionally(e);
1272        return;
1273      }
1274
1275      addListener(
1276        this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
1277          (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1278          new MergeTableRegionProcedureBiConsumer(tableName)),
1279        (ret, err2) -> {
1280          if (err2 != null) {
1281            future.completeExceptionally(err2);
1282          } else {
1283            future.complete(ret);
1284          }
1285        });
1286    });
1287    return future;
1288  }
1289
1290  @Override
1291  public CompletableFuture<Void> split(TableName tableName) {
1292    CompletableFuture<Void> future = new CompletableFuture<>();
1293    addListener(tableExists(tableName), (exist, error) -> {
1294      if (error != null) {
1295        future.completeExceptionally(error);
1296        return;
1297      }
1298      if (!exist) {
1299        future.completeExceptionally(new TableNotFoundException(tableName));
1300        return;
1301      }
1302      addListener(
1303        metaTable
1304          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1305            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1306            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1307        (results, err2) -> {
1308          if (err2 != null) {
1309            future.completeExceptionally(err2);
1310            return;
1311          }
1312          if (results != null && !results.isEmpty()) {
1313            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1314            for (Result r : results) {
1315              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1316                continue;
1317              }
1318              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1319              if (rl != null) {
1320                for (HRegionLocation h : rl.getRegionLocations()) {
1321                  if (h != null && h.getServerName() != null) {
1322                    RegionInfo hri = h.getRegion();
1323                    if (hri == null || hri.isSplitParent() ||
1324                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1325                      continue;
1326                    }
1327                    splitFutures.add(split(hri, null));
1328                  }
1329                }
1330              }
1331            }
1332            addListener(
1333              CompletableFuture
1334                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1335              (ret, exception) -> {
1336                if (exception != null) {
1337                  future.completeExceptionally(exception);
1338                  return;
1339                }
1340                future.complete(ret);
1341              });
1342          } else {
1343            future.complete(null);
1344          }
1345        });
1346    });
1347    return future;
1348  }
1349
1350  @Override
1351  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1352    CompletableFuture<Void> result = new CompletableFuture<>();
1353    if (splitPoint == null) {
1354      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1355    }
1356    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1357      (loc, err) -> {
1358        if (err != null) {
1359          result.completeExceptionally(err);
1360        } else if (loc == null || loc.getRegion() == null) {
1361          result.completeExceptionally(new IllegalArgumentException(
1362            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1363        } else {
1364          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1365            if (err2 != null) {
1366              result.completeExceptionally(err2);
1367            } else {
1368              result.complete(ret);
1369            }
1370
1371          });
1372        }
1373      });
1374    return result;
1375  }
1376
1377  @Override
1378  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1379    CompletableFuture<Void> future = new CompletableFuture<>();
1380    addListener(getRegionLocation(regionName), (location, err) -> {
1381      if (err != null) {
1382        future.completeExceptionally(err);
1383        return;
1384      }
1385      RegionInfo regionInfo = location.getRegion();
1386      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1387        future
1388          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1389            "Replicas are auto-split when their primary is split."));
1390        return;
1391      }
1392      ServerName serverName = location.getServerName();
1393      if (serverName == null) {
1394        future
1395          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1396        return;
1397      }
1398      addListener(split(regionInfo, null), (ret, err2) -> {
1399        if (err2 != null) {
1400          future.completeExceptionally(err2);
1401        } else {
1402          future.complete(ret);
1403        }
1404      });
1405    });
1406    return future;
1407  }
1408
1409  @Override
1410  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1411    Preconditions.checkNotNull(splitPoint,
1412      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1413    CompletableFuture<Void> future = new CompletableFuture<>();
1414    addListener(getRegionLocation(regionName), (location, err) -> {
1415      if (err != null) {
1416        future.completeExceptionally(err);
1417        return;
1418      }
1419      RegionInfo regionInfo = location.getRegion();
1420      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1421        future
1422          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1423            "Replicas are auto-split when their primary is split."));
1424        return;
1425      }
1426      ServerName serverName = location.getServerName();
1427      if (serverName == null) {
1428        future
1429          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1430        return;
1431      }
1432      if (regionInfo.getStartKey() != null &&
1433        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1434        future.completeExceptionally(
1435          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1436        return;
1437      }
1438      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1439        if (err2 != null) {
1440          future.completeExceptionally(err2);
1441        } else {
1442          future.complete(ret);
1443        }
1444      });
1445    });
1446    return future;
1447  }
1448
1449  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1450    CompletableFuture<Void> future = new CompletableFuture<>();
1451    TableName tableName = hri.getTable();
1452    SplitTableRegionRequest request = null;
1453    try {
1454      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1455        ng.newNonce());
1456    } catch (DeserializationException e) {
1457      future.completeExceptionally(e);
1458      return future;
1459    }
1460
1461    addListener(
1462      this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
1463        request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1464        new SplitTableRegionProcedureBiConsumer(tableName)),
1465      (ret, err2) -> {
1466        if (err2 != null) {
1467          future.completeExceptionally(err2);
1468        } else {
1469          future.complete(ret);
1470        }
1471      });
1472    return future;
1473  }
1474
1475  @Override
1476  public CompletableFuture<Void> assign(byte[] regionName) {
1477    CompletableFuture<Void> future = new CompletableFuture<>();
1478    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1479      if (err != null) {
1480        future.completeExceptionally(err);
1481        return;
1482      }
1483      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1484        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1485          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1486          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1487        .call(), (ret, err2) -> {
1488          if (err2 != null) {
1489            future.completeExceptionally(err2);
1490          } else {
1491            future.complete(ret);
1492          }
1493        });
1494    });
1495    return future;
1496  }
1497
1498  @Override
1499  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1500    CompletableFuture<Void> future = new CompletableFuture<>();
1501    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1502      if (err != null) {
1503        future.completeExceptionally(err);
1504        return;
1505      }
1506      addListener(
1507        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1508          .action(((controller, stub) -> this
1509            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1510              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1511              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1512          .call(),
1513        (ret, err2) -> {
1514          if (err2 != null) {
1515            future.completeExceptionally(err2);
1516          } else {
1517            future.complete(ret);
1518          }
1519        });
1520    });
1521    return future;
1522  }
1523
1524  @Override
1525  public CompletableFuture<Void> offline(byte[] regionName) {
1526    CompletableFuture<Void> future = new CompletableFuture<>();
1527    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1528      if (err != null) {
1529        future.completeExceptionally(err);
1530        return;
1531      }
1532      addListener(
1533        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1534          .action(((controller, stub) -> this
1535            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1536              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1537              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1538          .call(),
1539        (ret, err2) -> {
1540          if (err2 != null) {
1541            future.completeExceptionally(err2);
1542          } else {
1543            future.complete(ret);
1544          }
1545        });
1546    });
1547    return future;
1548  }
1549
1550  @Override
1551  public CompletableFuture<Void> move(byte[] regionName) {
1552    CompletableFuture<Void> future = new CompletableFuture<>();
1553    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1554      if (err != null) {
1555        future.completeExceptionally(err);
1556        return;
1557      }
1558      addListener(
1559        moveRegion(regionInfo,
1560          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1561        (ret, err2) -> {
1562          if (err2 != null) {
1563            future.completeExceptionally(err2);
1564          } else {
1565            future.complete(ret);
1566          }
1567        });
1568    });
1569    return future;
1570  }
1571
1572  @Override
1573  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1574    Preconditions.checkNotNull(destServerName,
1575      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1576    CompletableFuture<Void> future = new CompletableFuture<>();
1577    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1578      if (err != null) {
1579        future.completeExceptionally(err);
1580        return;
1581      }
1582      addListener(
1583        moveRegion(regionInfo, RequestConverter
1584          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1585        (ret, err2) -> {
1586          if (err2 != null) {
1587            future.completeExceptionally(err2);
1588          } else {
1589            future.complete(ret);
1590          }
1591        });
1592    });
1593    return future;
1594  }
1595
1596  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1597    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1598      .action(
1599        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1600          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1601      .call();
1602  }
1603
1604  @Override
1605  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1606    return this
1607        .<Void> newMasterCaller()
1608        .action(
1609          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1610            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1611            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1612  }
1613
1614  @Override
1615  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1616    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1617    Scan scan = QuotaTableUtil.makeScan(filter);
1618    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1619        .scan(scan, new AdvancedScanResultConsumer() {
1620          List<QuotaSettings> settings = new ArrayList<>();
1621
1622          @Override
1623          public void onNext(Result[] results, ScanController controller) {
1624            for (Result result : results) {
1625              try {
1626                QuotaTableUtil.parseResultToCollection(result, settings);
1627              } catch (IOException e) {
1628                controller.terminate();
1629                future.completeExceptionally(e);
1630              }
1631            }
1632          }
1633
1634          @Override
1635          public void onError(Throwable error) {
1636            future.completeExceptionally(error);
1637          }
1638
1639          @Override
1640          public void onComplete() {
1641            future.complete(settings);
1642          }
1643        });
1644    return future;
1645  }
1646
1647  @Override
1648  public CompletableFuture<Void> addReplicationPeer(String peerId,
1649      ReplicationPeerConfig peerConfig, boolean enabled) {
1650    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1651      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1652      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1653      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1654  }
1655
1656  @Override
1657  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1658    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1659      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1660      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1661      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1662  }
1663
1664  @Override
1665  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1666    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1667      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1668      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1669      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1670  }
1671
1672  @Override
1673  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1674    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1675      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1676      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1677      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1678  }
1679
1680  @Override
1681  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1682    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1683      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1684        controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1685        (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1686        (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1687      .call();
1688  }
1689
1690  @Override
1691  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1692      ReplicationPeerConfig peerConfig) {
1693    return this
1694      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1695        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1696        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1697        (resp) -> resp.getProcId(),
1698        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1699  }
1700
1701  @Override
1702  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1703      SyncReplicationState clusterState) {
1704    return this
1705      .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1706        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1707          clusterState),
1708        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1709        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1710          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1711  }
1712
1713  @Override
1714  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1715      Map<TableName, List<String>> tableCfs) {
1716    if (tableCfs == null) {
1717      return failedFuture(new ReplicationException("tableCfs is null"));
1718    }
1719
1720    CompletableFuture<Void> future = new CompletableFuture<Void>();
1721    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1722      if (!completeExceptionally(future, error)) {
1723        ReplicationPeerConfig newPeerConfig =
1724          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1725        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1726          if (!completeExceptionally(future, error)) {
1727            future.complete(result);
1728          }
1729        });
1730      }
1731    });
1732    return future;
1733  }
1734
1735  @Override
1736  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1737      Map<TableName, List<String>> tableCfs) {
1738    if (tableCfs == null) {
1739      return failedFuture(new ReplicationException("tableCfs is null"));
1740    }
1741
1742    CompletableFuture<Void> future = new CompletableFuture<Void>();
1743    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1744      if (!completeExceptionally(future, error)) {
1745        ReplicationPeerConfig newPeerConfig = null;
1746        try {
1747          newPeerConfig = ReplicationPeerConfigUtil
1748            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1749        } catch (ReplicationException e) {
1750          future.completeExceptionally(e);
1751          return;
1752        }
1753        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1754          if (!completeExceptionally(future, error)) {
1755            future.complete(result);
1756          }
1757        });
1758      }
1759    });
1760    return future;
1761  }
1762
1763  @Override
1764  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1765    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1766  }
1767
1768  @Override
1769  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1770    Preconditions.checkNotNull(pattern,
1771      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1772    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1773  }
1774
1775  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1776      ListReplicationPeersRequest request) {
1777    return this
1778        .<List<ReplicationPeerDescription>> newMasterCaller()
1779        .action(
1780          (controller, stub) -> this
1781              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1782                controller,
1783                stub,
1784                request,
1785                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1786                (resp) -> resp.getPeerDescList().stream()
1787                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1788                    .collect(Collectors.toList()))).call();
1789  }
1790
1791  @Override
1792  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1793    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1794    addListener(listTableDescriptors(), (tables, error) -> {
1795      if (!completeExceptionally(future, error)) {
1796        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1797        tables.forEach(table -> {
1798          Map<String, Integer> cfs = new HashMap<>();
1799          Stream.of(table.getColumnFamilies())
1800            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1801            .forEach(column -> {
1802              cfs.put(column.getNameAsString(), column.getScope());
1803            });
1804          if (!cfs.isEmpty()) {
1805            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1806          }
1807        });
1808        future.complete(replicatedTableCFs);
1809      }
1810    });
1811    return future;
1812  }
1813
1814  @Override
1815  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1816    SnapshotProtos.SnapshotDescription snapshot =
1817      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1818    try {
1819      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1820    } catch (IllegalArgumentException e) {
1821      return failedFuture(e);
1822    }
1823    CompletableFuture<Void> future = new CompletableFuture<>();
1824    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1825    addListener(this.<Long> newMasterCaller()
1826      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1827        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1828        resp -> resp.getExpectedTimeout()))
1829      .call(), (expectedTimeout, err) -> {
1830        if (err != null) {
1831          future.completeExceptionally(err);
1832          return;
1833        }
1834        TimerTask pollingTask = new TimerTask() {
1835          int tries = 0;
1836          long startTime = EnvironmentEdgeManager.currentTime();
1837          long endTime = startTime + expectedTimeout;
1838          long maxPauseTime = expectedTimeout / maxAttempts;
1839
1840          @Override
1841          public void run(Timeout timeout) throws Exception {
1842            if (EnvironmentEdgeManager.currentTime() < endTime) {
1843              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1844                if (err2 != null) {
1845                  future.completeExceptionally(err2);
1846                } else if (done) {
1847                  future.complete(null);
1848                } else {
1849                  // retry again after pauseTime.
1850                  long pauseTime =
1851                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1852                  pauseTime = Math.min(pauseTime, maxPauseTime);
1853                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1854                    TimeUnit.MILLISECONDS);
1855                }
1856              });
1857            } else {
1858              future.completeExceptionally(
1859                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1860                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1861            }
1862          }
1863        };
1864        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1865      });
1866    return future;
1867  }
1868
1869  @Override
1870  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1871    return this
1872        .<Boolean> newMasterCaller()
1873        .action(
1874          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1875            controller,
1876            stub,
1877            IsSnapshotDoneRequest.newBuilder()
1878                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1879                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1880  }
1881
1882  @Override
1883  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1884    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1885      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1886      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1887    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1888  }
1889
1890  @Override
1891  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1892      boolean restoreAcl) {
1893    CompletableFuture<Void> future = new CompletableFuture<>();
1894    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1895      if (err != null) {
1896        future.completeExceptionally(err);
1897        return;
1898      }
1899      TableName tableName = null;
1900      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1901        for (SnapshotDescription snap : snapshotDescriptions) {
1902          if (snap.getName().equals(snapshotName)) {
1903            tableName = snap.getTableName();
1904            break;
1905          }
1906        }
1907      }
1908      if (tableName == null) {
1909        future.completeExceptionally(new RestoreSnapshotException(
1910          "Unable to find the table name for snapshot=" + snapshotName));
1911        return;
1912      }
1913      final TableName finalTableName = tableName;
1914      addListener(tableExists(finalTableName), (exists, err2) -> {
1915        if (err2 != null) {
1916          future.completeExceptionally(err2);
1917        } else if (!exists) {
1918          // if table does not exist, then just clone snapshot into new table.
1919          completeConditionalOnFuture(future,
1920            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1921        } else {
1922          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1923            if (err4 != null) {
1924              future.completeExceptionally(err4);
1925            } else if (!disabled) {
1926              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1927            } else {
1928              completeConditionalOnFuture(future,
1929                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1930            }
1931          });
1932        }
1933      });
1934    });
1935    return future;
1936  }
1937
1938  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1939      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1940    if (takeFailSafeSnapshot) {
1941      CompletableFuture<Void> future = new CompletableFuture<>();
1942      // Step.1 Take a snapshot of the current state
1943      String failSafeSnapshotSnapshotNameFormat =
1944        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1945          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1946      final String failSafeSnapshotSnapshotName =
1947        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1948          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1949          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1950      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1951      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1952        if (err != null) {
1953          future.completeExceptionally(err);
1954        } else {
1955          // Step.2 Restore snapshot
1956          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1957            (void2, err2) -> {
1958              if (err2 != null) {
1959                // Step.3.a Something went wrong during the restore and try to rollback.
1960                addListener(
1961                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1962                  (void3, err3) -> {
1963                    if (err3 != null) {
1964                      future.completeExceptionally(err3);
1965                    } else {
1966                      String msg =
1967                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1968                          failSafeSnapshotSnapshotName + " succeeded.";
1969                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1970                    }
1971                  });
1972              } else {
1973                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1974                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1975                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1976                  if (err3 != null) {
1977                    LOG.error(
1978                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1979                      err3);
1980                  }
1981                  future.complete(ret3);
1982                });
1983              }
1984            });
1985        }
1986      });
1987      return future;
1988    } else {
1989      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
1990    }
1991  }
1992
1993  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
1994      CompletableFuture<T> parentFuture) {
1995    addListener(parentFuture, (res, err) -> {
1996      if (err != null) {
1997        dependentFuture.completeExceptionally(err);
1998      } else {
1999        dependentFuture.complete(res);
2000      }
2001    });
2002  }
2003
2004  @Override
2005  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2006      boolean restoreAcl) {
2007    CompletableFuture<Void> future = new CompletableFuture<>();
2008    addListener(tableExists(tableName), (exists, err) -> {
2009      if (err != null) {
2010        future.completeExceptionally(err);
2011      } else if (exists) {
2012        future.completeExceptionally(new TableExistsException(tableName));
2013      } else {
2014        completeConditionalOnFuture(future,
2015          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2016      }
2017    });
2018    return future;
2019  }
2020
2021  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2022      boolean restoreAcl) {
2023    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2024      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2025    try {
2026      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2027    } catch (IllegalArgumentException e) {
2028      return failedFuture(e);
2029    }
2030    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2031      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2032        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2033          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2034        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2035      .call());
2036  }
2037
2038  @Override
2039  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2040    return getCompletedSnapshots(null);
2041  }
2042
2043  @Override
2044  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2045    Preconditions.checkNotNull(pattern,
2046      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2047    return getCompletedSnapshots(pattern);
2048  }
2049
2050  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2051    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2052        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2053        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2054          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2055          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2056        .call();
2057  }
2058
2059  @Override
2060  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2061    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2062        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2063    return getCompletedSnapshots(tableNamePattern, null);
2064  }
2065
2066  @Override
2067  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2068      Pattern snapshotNamePattern) {
2069    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2070        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2071    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2072        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2073    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2074  }
2075
2076  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2077      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2078    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2079    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2080      if (err != null) {
2081        future.completeExceptionally(err);
2082        return;
2083      }
2084      if (tableNames == null || tableNames.size() <= 0) {
2085        future.complete(Collections.emptyList());
2086        return;
2087      }
2088      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2089        if (err2 != null) {
2090          future.completeExceptionally(err2);
2091          return;
2092        }
2093        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2094          future.complete(Collections.emptyList());
2095          return;
2096        }
2097        future.complete(snapshotDescList.stream()
2098          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2099          .collect(Collectors.toList()));
2100      });
2101    });
2102    return future;
2103  }
2104
2105  @Override
2106  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2107    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2108  }
2109
2110  @Override
2111  public CompletableFuture<Void> deleteSnapshots() {
2112    return internalDeleteSnapshots(null, null);
2113  }
2114
2115  @Override
2116  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2117    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2118        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2119    return internalDeleteSnapshots(null, snapshotNamePattern);
2120  }
2121
2122  @Override
2123  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2124    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2125        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2126    return internalDeleteSnapshots(tableNamePattern, null);
2127  }
2128
2129  @Override
2130  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2131      Pattern snapshotNamePattern) {
2132    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2133        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2134    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2135        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2136    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2137  }
2138
2139  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2140      Pattern snapshotNamePattern) {
2141    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2142    if (tableNamePattern == null) {
2143      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2144    } else {
2145      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2146    }
2147    CompletableFuture<Void> future = new CompletableFuture<>();
2148    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2149      if (err != null) {
2150        future.completeExceptionally(err);
2151        return;
2152      }
2153      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2154        future.complete(null);
2155        return;
2156      }
2157      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2158        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2159          if (e != null) {
2160            future.completeExceptionally(e);
2161          } else {
2162            future.complete(v);
2163          }
2164        });
2165    }));
2166    return future;
2167  }
2168
2169  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2170    return this
2171        .<Void> newMasterCaller()
2172        .action(
2173          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2174            controller,
2175            stub,
2176            DeleteSnapshotRequest.newBuilder()
2177                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2178                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2179  }
2180
2181  @Override
2182  public CompletableFuture<Void> execProcedure(String signature, String instance,
2183      Map<String, String> props) {
2184    CompletableFuture<Void> future = new CompletableFuture<>();
2185    ProcedureDescription procDesc =
2186      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2187    addListener(this.<Long> newMasterCaller()
2188      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2189        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2190        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2191      .call(), (expectedTimeout, err) -> {
2192        if (err != null) {
2193          future.completeExceptionally(err);
2194          return;
2195        }
2196        TimerTask pollingTask = new TimerTask() {
2197          int tries = 0;
2198          long startTime = EnvironmentEdgeManager.currentTime();
2199          long endTime = startTime + expectedTimeout;
2200          long maxPauseTime = expectedTimeout / maxAttempts;
2201
2202          @Override
2203          public void run(Timeout timeout) throws Exception {
2204            if (EnvironmentEdgeManager.currentTime() < endTime) {
2205              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2206                if (err2 != null) {
2207                  future.completeExceptionally(err2);
2208                  return;
2209                }
2210                if (done) {
2211                  future.complete(null);
2212                } else {
2213                  // retry again after pauseTime.
2214                  long pauseTime =
2215                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2216                  pauseTime = Math.min(pauseTime, maxPauseTime);
2217                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2218                    TimeUnit.MICROSECONDS);
2219                }
2220              });
2221            } else {
2222              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2223                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2224            }
2225          }
2226        };
2227        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2228        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2229      });
2230    return future;
2231  }
2232
2233  @Override
2234  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2235      Map<String, String> props) {
2236    ProcedureDescription proDesc =
2237        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2238    return this.<byte[]> newMasterCaller()
2239        .action(
2240          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2241            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2242            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2243            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2244        .call();
2245  }
2246
2247  @Override
2248  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2249      Map<String, String> props) {
2250    ProcedureDescription proDesc =
2251        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2252    return this.<Boolean> newMasterCaller()
2253        .action((controller, stub) -> this
2254            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2255              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2256              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2257        .call();
2258  }
2259
2260  @Override
2261  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2262    return this.<Boolean> newMasterCaller().action(
2263      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2264        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2265        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2266        .call();
2267  }
2268
2269  @Override
2270  public CompletableFuture<String> getProcedures() {
2271    return this
2272        .<String> newMasterCaller()
2273        .action(
2274          (controller, stub) -> this
2275              .<GetProceduresRequest, GetProceduresResponse, String> call(
2276                controller, stub, GetProceduresRequest.newBuilder().build(),
2277                (s, c, req, done) -> s.getProcedures(c, req, done),
2278                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2279  }
2280
2281  @Override
2282  public CompletableFuture<String> getLocks() {
2283    return this
2284        .<String> newMasterCaller()
2285        .action(
2286          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2287            controller, stub, GetLocksRequest.newBuilder().build(),
2288            (s, c, req, done) -> s.getLocks(c, req, done),
2289            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2290  }
2291
2292  @Override
2293  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2294    return this.<Void> newMasterCaller()
2295        .action((controller, stub) -> this
2296          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2297            controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2298            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2299        .call();
2300  }
2301
2302  @Override
2303  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2304    return this.<List<ServerName>> newMasterCaller()
2305        .action((controller, stub) -> this
2306          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2307            List<ServerName>> call(
2308              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2309              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2310              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2311                  .collect(Collectors.toList())))
2312        .call();
2313  }
2314
2315  @Override
2316  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2317      List<byte[]> encodedRegionNames) {
2318    return this.<Void> newMasterCaller()
2319        .action((controller, stub) -> this
2320          .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2321            stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2322            (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2323        .call();
2324  }
2325
2326  /**
2327   * Get the region location for the passed region name. The region name may be a full region name
2328   * or encoded region name. If the region does not found, then it'll throw an
2329   * UnknownRegionException wrapped by a {@link CompletableFuture}
2330   * @param regionNameOrEncodedRegionName region name or encoded region name
2331   * @return region location, wrapped by a {@link CompletableFuture}
2332   */
2333  @VisibleForTesting
2334  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2335    if (regionNameOrEncodedRegionName == null) {
2336      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2337    }
2338    try {
2339      CompletableFuture<Optional<HRegionLocation>> future;
2340      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2341        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2342        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2343          // old format encodedName, should be meta region
2344          future = connection.registry.getMetaRegionLocation()
2345            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2346              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2347        } else {
2348          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2349            regionNameOrEncodedRegionName);
2350        }
2351      } else {
2352        RegionInfo regionInfo =
2353          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2354        if (regionInfo.isMetaRegion()) {
2355          future = connection.registry.getMetaRegionLocation()
2356            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2357              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2358              .findFirst());
2359        } else {
2360          future =
2361            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2362        }
2363      }
2364
2365      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2366      addListener(future, (location, err) -> {
2367        if (err != null) {
2368          returnedFuture.completeExceptionally(err);
2369          return;
2370        }
2371        if (!location.isPresent() || location.get().getRegion() == null) {
2372          returnedFuture.completeExceptionally(
2373            new UnknownRegionException("Invalid region name or encoded region name: " +
2374              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2375        } else {
2376          returnedFuture.complete(location.get());
2377        }
2378      });
2379      return returnedFuture;
2380    } catch (IOException e) {
2381      return failedFuture(e);
2382    }
2383  }
2384
2385  /**
2386   * Get the region info for the passed region name. The region name may be a full region name or
2387   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2388   * wrapped by a {@link CompletableFuture}
2389   * @param regionNameOrEncodedRegionName
2390   * @return region info, wrapped by a {@link CompletableFuture}
2391   */
2392  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2393    if (regionNameOrEncodedRegionName == null) {
2394      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2395    }
2396
2397    if (Bytes.equals(regionNameOrEncodedRegionName,
2398      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2399      Bytes.equals(regionNameOrEncodedRegionName,
2400        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2401      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2402    }
2403
2404    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2405    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2406      if (err != null) {
2407        future.completeExceptionally(err);
2408      } else {
2409        future.complete(location.getRegion());
2410      }
2411    });
2412    return future;
2413  }
2414
2415  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2416    if (numRegions < 3) {
2417      throw new IllegalArgumentException("Must create at least three regions");
2418    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2419      throw new IllegalArgumentException("Start key must be smaller than end key");
2420    }
2421    if (numRegions == 3) {
2422      return new byte[][] { startKey, endKey };
2423    }
2424    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2425    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2426      throw new IllegalArgumentException("Unable to split key range into enough regions");
2427    }
2428    return splitKeys;
2429  }
2430
2431  private void verifySplitKeys(byte[][] splitKeys) {
2432    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2433    // Verify there are no duplicate split keys
2434    byte[] lastKey = null;
2435    for (byte[] splitKey : splitKeys) {
2436      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2437        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2438      }
2439      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2440        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2441            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2442      }
2443      lastKey = splitKey;
2444    }
2445  }
2446
2447  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2448
2449    abstract void onFinished();
2450
2451    abstract void onError(Throwable error);
2452
2453    @Override
2454    public void accept(Void v, Throwable error) {
2455      if (error != null) {
2456        onError(error);
2457        return;
2458      }
2459      onFinished();
2460    }
2461  }
2462
2463  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2464    protected final TableName tableName;
2465
2466    TableProcedureBiConsumer(TableName tableName) {
2467      this.tableName = tableName;
2468    }
2469
2470    abstract String getOperationType();
2471
2472    String getDescription() {
2473      return "Operation: " + getOperationType() + ", " + "Table Name: "
2474          + tableName.getNameWithNamespaceInclAsString();
2475    }
2476
2477    @Override
2478    void onFinished() {
2479      LOG.info(getDescription() + " completed");
2480    }
2481
2482    @Override
2483    void onError(Throwable error) {
2484      LOG.info(getDescription() + " failed with " + error.getMessage());
2485    }
2486  }
2487
2488  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2489    protected final String namespaceName;
2490
2491    NamespaceProcedureBiConsumer(String namespaceName) {
2492      this.namespaceName = namespaceName;
2493    }
2494
2495    abstract String getOperationType();
2496
2497    String getDescription() {
2498      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2499    }
2500
2501    @Override
2502    void onFinished() {
2503      LOG.info(getDescription() + " completed");
2504    }
2505
2506    @Override
2507    void onError(Throwable error) {
2508      LOG.info(getDescription() + " failed with " + error.getMessage());
2509    }
2510  }
2511
2512  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2513
2514    CreateTableProcedureBiConsumer(TableName tableName) {
2515      super(tableName);
2516    }
2517
2518    @Override
2519    String getOperationType() {
2520      return "CREATE";
2521    }
2522  }
2523
2524  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2525
2526    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2527      super(tableName);
2528    }
2529
2530    @Override
2531    String getOperationType() {
2532      return "ENABLE";
2533    }
2534  }
2535
2536  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2537
2538    DeleteTableProcedureBiConsumer(TableName tableName) {
2539      super(tableName);
2540    }
2541
2542    @Override
2543    String getOperationType() {
2544      return "DELETE";
2545    }
2546
2547    @Override
2548    void onFinished() {
2549      connection.getLocator().clearCache(this.tableName);
2550      super.onFinished();
2551    }
2552  }
2553
2554  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2555
2556    TruncateTableProcedureBiConsumer(TableName tableName) {
2557      super(tableName);
2558    }
2559
2560    @Override
2561    String getOperationType() {
2562      return "TRUNCATE";
2563    }
2564  }
2565
2566  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2567
2568    EnableTableProcedureBiConsumer(TableName tableName) {
2569      super(tableName);
2570    }
2571
2572    @Override
2573    String getOperationType() {
2574      return "ENABLE";
2575    }
2576  }
2577
2578  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2579
2580    DisableTableProcedureBiConsumer(TableName tableName) {
2581      super(tableName);
2582    }
2583
2584    @Override
2585    String getOperationType() {
2586      return "DISABLE";
2587    }
2588  }
2589
2590  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2591
2592    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2593      super(tableName);
2594    }
2595
2596    @Override
2597    String getOperationType() {
2598      return "ADD_COLUMN_FAMILY";
2599    }
2600  }
2601
2602  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2603
2604    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2605      super(tableName);
2606    }
2607
2608    @Override
2609    String getOperationType() {
2610      return "DELETE_COLUMN_FAMILY";
2611    }
2612  }
2613
2614  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2615
2616    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2617      super(tableName);
2618    }
2619
2620    @Override
2621    String getOperationType() {
2622      return "MODIFY_COLUMN_FAMILY";
2623    }
2624  }
2625
2626  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2627
2628    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2629      super(namespaceName);
2630    }
2631
2632    @Override
2633    String getOperationType() {
2634      return "CREATE_NAMESPACE";
2635    }
2636  }
2637
2638  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2639
2640    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2641      super(namespaceName);
2642    }
2643
2644    @Override
2645    String getOperationType() {
2646      return "DELETE_NAMESPACE";
2647    }
2648  }
2649
2650  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2651
2652    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2653      super(namespaceName);
2654    }
2655
2656    @Override
2657    String getOperationType() {
2658      return "MODIFY_NAMESPACE";
2659    }
2660  }
2661
2662  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2663
2664    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2665      super(tableName);
2666    }
2667
2668    @Override
2669    String getOperationType() {
2670      return "MERGE_REGIONS";
2671    }
2672  }
2673
2674  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2675
2676    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2677      super(tableName);
2678    }
2679
2680    @Override
2681    String getOperationType() {
2682      return "SPLIT_REGION";
2683    }
2684  }
2685
2686  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2687    private final String peerId;
2688    private final Supplier<String> getOperation;
2689
2690    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2691      this.peerId = peerId;
2692      this.getOperation = getOperation;
2693    }
2694
2695    String getDescription() {
2696      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2697    }
2698
2699    @Override
2700    void onFinished() {
2701      LOG.info(getDescription() + " completed");
2702    }
2703
2704    @Override
2705    void onError(Throwable error) {
2706      LOG.info(getDescription() + " failed with " + error.getMessage());
2707    }
2708  }
2709
2710  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2711    CompletableFuture<Void> future = new CompletableFuture<>();
2712    addListener(procFuture, (procId, error) -> {
2713      if (error != null) {
2714        future.completeExceptionally(error);
2715        return;
2716      }
2717      getProcedureResult(procId, future, 0);
2718    });
2719    return future;
2720  }
2721
2722  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2723    addListener(
2724      this.<GetProcedureResultResponse> newMasterCaller()
2725        .action((controller, stub) -> this
2726          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2727            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2728            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2729        .call(),
2730      (response, error) -> {
2731        if (error != null) {
2732          LOG.warn("failed to get the procedure result procId={}", procId,
2733            ConnectionUtils.translateException(error));
2734          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2735            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2736          return;
2737        }
2738        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2739          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2740            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2741          return;
2742        }
2743        if (response.hasException()) {
2744          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2745          future.completeExceptionally(ioe);
2746        } else {
2747          future.complete(null);
2748        }
2749      });
2750  }
2751
2752  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2753    CompletableFuture<T> future = new CompletableFuture<>();
2754    future.completeExceptionally(error);
2755    return future;
2756  }
2757
2758  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2759    if (error != null) {
2760      future.completeExceptionally(error);
2761      return true;
2762    }
2763    return false;
2764  }
2765
2766  @Override
2767  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2768    return getClusterMetrics(EnumSet.allOf(Option.class));
2769  }
2770
2771  @Override
2772  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2773    return this
2774        .<ClusterMetrics> newMasterCaller()
2775        .action(
2776          (controller, stub) -> this
2777              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2778                stub, RequestConverter.buildGetClusterStatusRequest(options),
2779                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2780                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2781  }
2782
2783  @Override
2784  public CompletableFuture<Void> shutdown() {
2785    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2786      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2787        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2788        resp -> null))
2789      .call();
2790  }
2791
2792  @Override
2793  public CompletableFuture<Void> stopMaster() {
2794    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2795      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2796        controller, stub, StopMasterRequest.newBuilder().build(),
2797        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2798      .call();
2799  }
2800
2801  @Override
2802  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2803    StopServerRequest request = RequestConverter
2804      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2805    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2806      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2807        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2808        resp -> null))
2809      .serverName(serverName).call();
2810  }
2811
2812  @Override
2813  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2814    return this
2815        .<Void> newAdminCaller()
2816        .action(
2817          (controller, stub) -> this
2818              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2819                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2820                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2821        .serverName(serverName).call();
2822  }
2823
2824  @Override
2825  public CompletableFuture<Void> updateConfiguration() {
2826    CompletableFuture<Void> future = new CompletableFuture<Void>();
2827    addListener(
2828      getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
2829      (status, err) -> {
2830        if (err != null) {
2831          future.completeExceptionally(err);
2832        } else {
2833          List<CompletableFuture<Void>> futures = new ArrayList<>();
2834          status.getLiveServerMetrics().keySet()
2835            .forEach(server -> futures.add(updateConfiguration(server)));
2836          futures.add(updateConfiguration(status.getMasterName()));
2837          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2838          addListener(
2839            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2840            (result, err2) -> {
2841              if (err2 != null) {
2842                future.completeExceptionally(err2);
2843              } else {
2844                future.complete(result);
2845              }
2846            });
2847        }
2848      });
2849    return future;
2850  }
2851
2852  @Override
2853  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2854    return this
2855        .<Void> newAdminCaller()
2856        .action(
2857          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2858            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2859            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2860        .serverName(serverName).call();
2861  }
2862
2863  @Override
2864  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2865    return this
2866        .<Void> newAdminCaller()
2867        .action(
2868          (controller, stub) -> this
2869              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2870                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2871                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2872        .serverName(serverName).call();
2873  }
2874
2875  @Override
2876  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2877    return this
2878        .<List<SecurityCapability>> newMasterCaller()
2879        .action(
2880          (controller, stub) -> this
2881              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2882                controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2883                    done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2884                    .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2885  }
2886
2887  @Override
2888  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2889    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2890  }
2891
2892  @Override
2893  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2894      TableName tableName) {
2895    Preconditions.checkNotNull(tableName,
2896      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2897    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2898  }
2899
2900  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2901      ServerName serverName) {
2902    return this.<List<RegionMetrics>> newAdminCaller()
2903        .action((controller, stub) -> this
2904            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2905              adminCall(controller, stub, request, (s, c, req, done) ->
2906                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2907        .serverName(serverName).call();
2908  }
2909
2910  @Override
2911  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2912    return this
2913        .<Boolean> newMasterCaller()
2914        .action(
2915          (controller, stub) -> this
2916              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2917                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2918                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2919                resp -> resp.getInMaintenanceMode())).call();
2920  }
2921
2922  @Override
2923  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2924      CompactType compactType) {
2925    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2926
2927    switch (compactType) {
2928      case MOB:
2929        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2930          if (err != null) {
2931            future.completeExceptionally(err);
2932            return;
2933          }
2934          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2935
2936          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2937            .action((controller, stub) -> this
2938              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2939                controller, stub,
2940                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2941                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2942            .call(), (resp2, err2) -> {
2943              if (err2 != null) {
2944                future.completeExceptionally(err2);
2945              } else {
2946                if (resp2.hasCompactionState()) {
2947                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2948                } else {
2949                  future.complete(CompactionState.NONE);
2950                }
2951              }
2952            });
2953        });
2954        break;
2955      case NORMAL:
2956        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2957          if (err != null) {
2958            future.completeExceptionally(err);
2959            return;
2960          }
2961          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2962          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2963          locations.stream().filter(loc -> loc.getServerName() != null)
2964            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2965            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2966              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2967                // If any region compaction state is MAJOR_AND_MINOR
2968                // the table compaction state is MAJOR_AND_MINOR, too.
2969                if (err2 != null) {
2970                  future.completeExceptionally(unwrapCompletionException(err2));
2971                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2972                  future.complete(regionState);
2973                } else {
2974                  regionStates.add(regionState);
2975                }
2976              }));
2977            });
2978          addListener(
2979            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2980            (ret, err3) -> {
2981              // If future not completed, check all regions's compaction state
2982              if (!future.isCompletedExceptionally() && !future.isDone()) {
2983                CompactionState state = CompactionState.NONE;
2984                for (CompactionState regionState : regionStates) {
2985                  switch (regionState) {
2986                    case MAJOR:
2987                      if (state == CompactionState.MINOR) {
2988                        future.complete(CompactionState.MAJOR_AND_MINOR);
2989                      } else {
2990                        state = CompactionState.MAJOR;
2991                      }
2992                      break;
2993                    case MINOR:
2994                      if (state == CompactionState.MAJOR) {
2995                        future.complete(CompactionState.MAJOR_AND_MINOR);
2996                      } else {
2997                        state = CompactionState.MINOR;
2998                      }
2999                      break;
3000                    case NONE:
3001                    default:
3002                  }
3003                }
3004                if (!future.isDone()) {
3005                  future.complete(state);
3006                }
3007              }
3008            });
3009        });
3010        break;
3011      default:
3012        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3013    }
3014
3015    return future;
3016  }
3017
3018  @Override
3019  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3020    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3021    addListener(getRegionLocation(regionName), (location, err) -> {
3022      if (err != null) {
3023        future.completeExceptionally(err);
3024        return;
3025      }
3026      ServerName serverName = location.getServerName();
3027      if (serverName == null) {
3028        future
3029          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3030        return;
3031      }
3032      addListener(
3033        this.<GetRegionInfoResponse> newAdminCaller()
3034          .action((controller, stub) -> this
3035            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3036              controller, stub,
3037              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3038                true),
3039              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3040          .serverName(serverName).call(),
3041        (resp2, err2) -> {
3042          if (err2 != null) {
3043            future.completeExceptionally(err2);
3044          } else {
3045            if (resp2.hasCompactionState()) {
3046              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3047            } else {
3048              future.complete(CompactionState.NONE);
3049            }
3050          }
3051        });
3052    });
3053    return future;
3054  }
3055
3056  @Override
3057  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3058    MajorCompactionTimestampRequest request =
3059        MajorCompactionTimestampRequest.newBuilder()
3060            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3061    return this
3062        .<Optional<Long>> newMasterCaller()
3063        .action(
3064          (controller, stub) -> this
3065              .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
3066                controller, stub, request,
3067                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3068                ProtobufUtil::toOptionalTimestamp)).call();
3069  }
3070
3071  @Override
3072  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3073      byte[] regionName) {
3074    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3075    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3076    addListener(getRegionInfo(regionName), (region, err) -> {
3077      if (err != null) {
3078        future.completeExceptionally(err);
3079        return;
3080      }
3081      MajorCompactionTimestampForRegionRequest.Builder builder =
3082        MajorCompactionTimestampForRegionRequest.newBuilder();
3083      builder.setRegion(
3084        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3085      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3086        .<MajorCompactionTimestampForRegionRequest,
3087        MajorCompactionTimestampResponse, Optional<Long>> call(
3088          controller, stub, builder.build(),
3089          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3090          ProtobufUtil::toOptionalTimestamp))
3091        .call(), (timestamp, err2) -> {
3092          if (err2 != null) {
3093            future.completeExceptionally(err2);
3094          } else {
3095            future.complete(timestamp);
3096          }
3097        });
3098    });
3099    return future;
3100  }
3101
3102  @Override
3103  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3104      List<String> serverNamesList) {
3105    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3106    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3107      if (err != null) {
3108        future.completeExceptionally(err);
3109        return;
3110      }
3111      // Accessed by multiple threads.
3112      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3113      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3114      serverNames.stream().forEach(serverName -> {
3115        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3116          if (err2 != null) {
3117            future.completeExceptionally(unwrapCompletionException(err2));
3118          } else {
3119            serverStates.put(serverName, serverState);
3120          }
3121        }));
3122      });
3123      addListener(
3124        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3125        (ret, err3) -> {
3126          if (!future.isCompletedExceptionally()) {
3127            if (err3 != null) {
3128              future.completeExceptionally(err3);
3129            } else {
3130              future.complete(serverStates);
3131            }
3132          }
3133        });
3134    });
3135    return future;
3136  }
3137
3138  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3139    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3140    if (serverNamesList.isEmpty()) {
3141      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3142        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
3143      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3144        if (err != null) {
3145          future.completeExceptionally(err);
3146        } else {
3147          future.complete(new ArrayList<>(clusterMetrics.getLiveServerMetrics().keySet()));
3148        }
3149      });
3150      return future;
3151    } else {
3152      List<ServerName> serverList = new ArrayList<>();
3153      for (String regionServerName : serverNamesList) {
3154        ServerName serverName = null;
3155        try {
3156          serverName = ServerName.valueOf(regionServerName);
3157        } catch (Exception e) {
3158          future.completeExceptionally(
3159            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3160        }
3161        if (serverName == null) {
3162          future.completeExceptionally(
3163            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3164        }
3165      }
3166      future.complete(serverList);
3167    }
3168    return future;
3169  }
3170
3171  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3172    return this
3173        .<Boolean>newAdminCaller()
3174        .serverName(serverName)
3175        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3176            Boolean>adminCall(controller, stub,
3177            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3178            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3179  }
3180
3181  @Override
3182  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3183    return this.<Boolean> newMasterCaller()
3184      .action((controller, stub) -> this
3185        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3186          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3187          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3188          (resp) -> resp.getPrevBalanceValue()))
3189      .call();
3190  }
3191
3192  @Override
3193  public CompletableFuture<Boolean> balance(boolean forcible) {
3194    return this
3195        .<Boolean> newMasterCaller()
3196        .action(
3197          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3198            stub, RequestConverter.buildBalanceRequest(forcible),
3199            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3200  }
3201
3202  @Override
3203  public CompletableFuture<Boolean> isBalancerEnabled() {
3204    return this
3205        .<Boolean> newMasterCaller()
3206        .action(
3207          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3208            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3209            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3210        .call();
3211  }
3212
3213  @Override
3214  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3215    return this
3216        .<Boolean> newMasterCaller()
3217        .action(
3218          (controller, stub) -> this
3219              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3220                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3221                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3222                    .getPrevNormalizerValue())).call();
3223  }
3224
3225  @Override
3226  public CompletableFuture<Boolean> isNormalizerEnabled() {
3227    return this
3228        .<Boolean> newMasterCaller()
3229        .action(
3230          (controller, stub) -> this
3231              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3232                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3233                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3234                (resp) -> resp.getEnabled())).call();
3235  }
3236
3237  @Override
3238  public CompletableFuture<Boolean> normalize() {
3239    return this
3240        .<Boolean> newMasterCaller()
3241        .action(
3242          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3243            controller, stub, RequestConverter.buildNormalizeRequest(),
3244            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3245        .call();
3246  }
3247
3248  @Override
3249  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3250    return this
3251        .<Boolean> newMasterCaller()
3252        .action(
3253          (controller, stub) -> this
3254              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3255                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3256                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3257                    .getPrevValue())).call();
3258  }
3259
3260  @Override
3261  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3262    return this
3263        .<Boolean> newMasterCaller()
3264        .action(
3265          (controller, stub) -> this
3266              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3267                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3268                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3269        .call();
3270  }
3271
3272  @Override
3273  public CompletableFuture<Boolean> runCleanerChore() {
3274    return this
3275        .<Boolean> newMasterCaller()
3276        .action(
3277          (controller, stub) -> this
3278              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3279                RequestConverter.buildRunCleanerChoreRequest(),
3280                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3281                (resp) -> resp.getCleanerChoreRan())).call();
3282  }
3283
3284  @Override
3285  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3286    return this
3287        .<Boolean> newMasterCaller()
3288        .action(
3289          (controller, stub) -> this
3290              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3291                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3292                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3293                    .getPrevValue())).call();
3294  }
3295
3296  @Override
3297  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3298    return this
3299        .<Boolean> newMasterCaller()
3300        .action(
3301          (controller, stub) -> this
3302              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3303                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3304                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3305                    .getValue())).call();
3306  }
3307
3308  @Override
3309  public CompletableFuture<Integer> runCatalogJanitor() {
3310    return this
3311        .<Integer> newMasterCaller()
3312        .action(
3313          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3314            controller, stub, RequestConverter.buildCatalogScanRequest(),
3315            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3316        .call();
3317  }
3318
3319  @Override
3320  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3321      ServiceCaller<S, R> callable) {
3322    MasterCoprocessorRpcChannelImpl channel =
3323        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3324    S stub = stubMaker.apply(channel);
3325    CompletableFuture<R> future = new CompletableFuture<>();
3326    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3327    callable.call(stub, controller, resp -> {
3328      if (controller.failed()) {
3329        future.completeExceptionally(controller.getFailed());
3330      } else {
3331        future.complete(resp);
3332      }
3333    });
3334    return future;
3335  }
3336
3337  @Override
3338  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3339      ServiceCaller<S, R> callable, ServerName serverName) {
3340    RegionServerCoprocessorRpcChannelImpl channel =
3341        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3342          serverName));
3343    S stub = stubMaker.apply(channel);
3344    CompletableFuture<R> future = new CompletableFuture<>();
3345    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3346    callable.call(stub, controller, resp -> {
3347      if (controller.failed()) {
3348        future.completeExceptionally(controller.getFailed());
3349      } else {
3350        future.complete(resp);
3351      }
3352    });
3353    return future;
3354  }
3355
3356  @Override
3357  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3358    return this.<List<ServerName>> newMasterCaller()
3359      .action((controller, stub) -> this
3360        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3361          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3362          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3363          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3364      .call();
3365  }
3366
3367  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3368    return this.connection.callerFactory.<T> serverRequest()
3369      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3370      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3371      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3372      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3373  }
3374
3375  @Override
3376  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3377    if (tableName == null) {
3378      return failedFuture(new IllegalArgumentException("Table name is null"));
3379    }
3380    CompletableFuture<Void> future = new CompletableFuture<>();
3381    addListener(tableExists(tableName), (exist, err) -> {
3382      if (err != null) {
3383        future.completeExceptionally(err);
3384        return;
3385      }
3386      if (!exist) {
3387        future.completeExceptionally(new TableNotFoundException(
3388          "Table '" + tableName.getNameAsString() + "' does not exists."));
3389        return;
3390      }
3391      addListener(getTableSplits(tableName), (splits, err1) -> {
3392        if (err1 != null) {
3393          future.completeExceptionally(err1);
3394        } else {
3395          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3396            if (err2 != null) {
3397              future.completeExceptionally(err2);
3398            } else {
3399              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3400                if (err3 != null) {
3401                  future.completeExceptionally(err3);
3402                } else {
3403                  future.complete(result3);
3404                }
3405              });
3406            }
3407          });
3408        }
3409      });
3410    });
3411    return future;
3412  }
3413
3414  @Override
3415  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3416    if (tableName == null) {
3417      return failedFuture(new IllegalArgumentException("Table name is null"));
3418    }
3419    CompletableFuture<Void> future = new CompletableFuture<>();
3420    addListener(tableExists(tableName), (exist, err) -> {
3421      if (err != null) {
3422        future.completeExceptionally(err);
3423        return;
3424      }
3425      if (!exist) {
3426        future.completeExceptionally(new TableNotFoundException(
3427          "Table '" + tableName.getNameAsString() + "' does not exists."));
3428        return;
3429      }
3430      addListener(setTableReplication(tableName, false), (result, err2) -> {
3431        if (err2 != null) {
3432          future.completeExceptionally(err2);
3433        } else {
3434          future.complete(result);
3435        }
3436      });
3437    });
3438    return future;
3439  }
3440
3441  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3442    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3443    addListener(
3444      getRegions(tableName).thenApply(regions -> regions.stream()
3445        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3446      (regions, err2) -> {
3447        if (err2 != null) {
3448          future.completeExceptionally(err2);
3449          return;
3450        }
3451        if (regions.size() == 1) {
3452          future.complete(null);
3453        } else {
3454          byte[][] splits = new byte[regions.size() - 1][];
3455          for (int i = 1; i < regions.size(); i++) {
3456            splits[i - 1] = regions.get(i).getStartKey();
3457          }
3458          future.complete(splits);
3459        }
3460      });
3461    return future;
3462  }
3463
3464  /**
3465   * Connect to peer and check the table descriptor on peer:
3466   * <ol>
3467   * <li>Create the same table on peer when not exist.</li>
3468   * <li>Throw an exception if the table already has replication enabled on any of the column
3469   * families.</li>
3470   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3471   * </ol>
3472   * @param tableName name of the table to sync to the peer
3473   * @param splits table split keys
3474   */
3475  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3476      byte[][] splits) {
3477    CompletableFuture<Void> future = new CompletableFuture<>();
3478    addListener(listReplicationPeers(), (peers, err) -> {
3479      if (err != null) {
3480        future.completeExceptionally(err);
3481        return;
3482      }
3483      if (peers == null || peers.size() <= 0) {
3484        future.completeExceptionally(
3485          new IllegalArgumentException("Found no peer cluster for replication."));
3486        return;
3487      }
3488      List<CompletableFuture<Void>> futures = new ArrayList<>();
3489      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3490        .forEach(peer -> {
3491          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3492        });
3493      addListener(
3494        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3495        (result, err2) -> {
3496          if (err2 != null) {
3497            future.completeExceptionally(err2);
3498          } else {
3499            future.complete(result);
3500          }
3501        });
3502    });
3503    return future;
3504  }
3505
3506  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3507      ReplicationPeerDescription peer) {
3508    Configuration peerConf = null;
3509    try {
3510      peerConf =
3511        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3512    } catch (IOException e) {
3513      return failedFuture(e);
3514    }
3515    CompletableFuture<Void> future = new CompletableFuture<>();
3516    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3517      if (err != null) {
3518        future.completeExceptionally(err);
3519        return;
3520      }
3521      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3522        if (err1 != null) {
3523          future.completeExceptionally(err1);
3524          return;
3525        }
3526        AsyncAdmin peerAdmin = conn.getAdmin();
3527        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3528          if (err2 != null) {
3529            future.completeExceptionally(err2);
3530            return;
3531          }
3532          if (!exist) {
3533            CompletableFuture<Void> createTableFuture = null;
3534            if (splits == null) {
3535              createTableFuture = peerAdmin.createTable(tableDesc);
3536            } else {
3537              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3538            }
3539            addListener(createTableFuture, (result, err3) -> {
3540              if (err3 != null) {
3541                future.completeExceptionally(err3);
3542              } else {
3543                future.complete(result);
3544              }
3545            });
3546          } else {
3547            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3548              (result, err4) -> {
3549                if (err4 != null) {
3550                  future.completeExceptionally(err4);
3551                } else {
3552                  future.complete(result);
3553                }
3554              });
3555          }
3556        });
3557      });
3558    });
3559    return future;
3560  }
3561
3562  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3563      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3564    CompletableFuture<Void> future = new CompletableFuture<>();
3565    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3566      if (err != null) {
3567        future.completeExceptionally(err);
3568        return;
3569      }
3570      if (peerTableDesc == null) {
3571        future.completeExceptionally(
3572          new IllegalArgumentException("Failed to get table descriptor for table " +
3573            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3574        return;
3575      }
3576      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3577        future.completeExceptionally(new IllegalArgumentException(
3578          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3579            ", but the table descriptors are not same when compared with source cluster." +
3580            " Thus can not enable the table's replication switch."));
3581        return;
3582      }
3583      future.complete(null);
3584    });
3585    return future;
3586  }
3587
3588  /**
3589   * Set the table's replication switch if the table's replication switch is already not set.
3590   * @param tableName name of the table
3591   * @param enableRep is replication switch enable or disable
3592   */
3593  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3594    CompletableFuture<Void> future = new CompletableFuture<>();
3595    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3596      if (err != null) {
3597        future.completeExceptionally(err);
3598        return;
3599      }
3600      if (!tableDesc.matchReplicationScope(enableRep)) {
3601        int scope =
3602          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3603        TableDescriptor newTableDesc =
3604          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3605        addListener(modifyTable(newTableDesc), (result, err2) -> {
3606          if (err2 != null) {
3607            future.completeExceptionally(err2);
3608          } else {
3609            future.complete(result);
3610          }
3611        });
3612      } else {
3613        future.complete(null);
3614      }
3615    });
3616    return future;
3617  }
3618
3619  @Override
3620  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3621    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3622    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3623      if (err != null) {
3624        future.completeExceptionally(err);
3625        return;
3626      }
3627      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3628        locations.stream().filter(l -> l.getRegion() != null)
3629          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3630          .collect(Collectors.groupingBy(l -> l.getServerName(),
3631            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3632      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3633      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3634      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3635        futures
3636          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3637            if (err2 != null) {
3638              future.completeExceptionally(unwrapCompletionException(err2));
3639            } else {
3640              aggregator.append(stats);
3641            }
3642          }));
3643      }
3644      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3645        (ret, err3) -> {
3646          if (err3 != null) {
3647            future.completeExceptionally(unwrapCompletionException(err3));
3648          } else {
3649            future.complete(aggregator.sum());
3650          }
3651        });
3652    });
3653    return future;
3654  }
3655
3656  @Override
3657  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3658      boolean preserveSplits) {
3659    CompletableFuture<Void> future = new CompletableFuture<>();
3660    addListener(tableExists(tableName), (exist, err) -> {
3661      if (err != null) {
3662        future.completeExceptionally(err);
3663        return;
3664      }
3665      if (!exist) {
3666        future.completeExceptionally(new TableNotFoundException(tableName));
3667        return;
3668      }
3669      addListener(tableExists(newTableName), (exist1, err1) -> {
3670        if (err1 != null) {
3671          future.completeExceptionally(err1);
3672          return;
3673        }
3674        if (exist1) {
3675          future.completeExceptionally(new TableExistsException(newTableName));
3676          return;
3677        }
3678        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3679          if (err2 != null) {
3680            future.completeExceptionally(err2);
3681            return;
3682          }
3683          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3684          if (preserveSplits) {
3685            addListener(getTableSplits(tableName), (splits, err3) -> {
3686              if (err3 != null) {
3687                future.completeExceptionally(err3);
3688              } else {
3689                addListener(
3690                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3691                  (result, err4) -> {
3692                    if (err4 != null) {
3693                      future.completeExceptionally(err4);
3694                    } else {
3695                      future.complete(result);
3696                    }
3697                  });
3698              }
3699            });
3700          } else {
3701            addListener(createTable(newTableDesc), (result, err5) -> {
3702              if (err5 != null) {
3703                future.completeExceptionally(err5);
3704              } else {
3705                future.complete(result);
3706              }
3707            });
3708          }
3709        });
3710      });
3711    });
3712    return future;
3713  }
3714
3715  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3716      List<RegionInfo> hris) {
3717    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3718      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3719        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3720        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3721        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3722      .serverName(serverName).call();
3723  }
3724
3725  @Override
3726  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3727    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3728        .action((controller, stub) -> this
3729            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3730              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3731              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3732              resp -> resp.getPreviousRpcThrottleEnabled()))
3733        .call();
3734    return future;
3735  }
3736
3737  @Override
3738  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3739    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3740        .action((controller, stub) -> this
3741            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3742              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3743              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3744              resp -> resp.getRpcThrottleEnabled()))
3745        .call();
3746    return future;
3747  }
3748
3749  @Override
3750  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3751    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3752        .action((controller, stub) -> this
3753            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3754              controller, stub,
3755              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3756                  .build(),
3757              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3758              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3759        .call();
3760    return future;
3761  }
3762
3763  @Override
3764  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3765    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3766      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3767      Map<TableName, Long>> call(controller, stub,
3768        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3769        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3770        resp -> resp.getSizesList().stream().collect(Collectors
3771          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3772      .call();
3773  }
3774
3775  @Override
3776  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3777      ServerName serverName) {
3778    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3779      .action((controller, stub) -> this
3780        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3781        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3782          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3783          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3784          resp -> resp.getSnapshotsList().stream()
3785            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3786              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3787      .serverName(serverName).call();
3788  }
3789
3790  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3791      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3792    return this.<SpaceQuotaSnapshot> newMasterCaller()
3793      .action((controller, stub) -> this
3794        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3795          RequestConverter.buildGetQuotaStatesRequest(),
3796          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3797      .call();
3798  }
3799
3800  @Override
3801  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3802    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3803      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3804      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3805  }
3806
3807  @Override
3808  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3809    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3810    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3811      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3812      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3813  }
3814
3815  @Override
3816  public CompletableFuture<Void> grant(UserPermission userPermission,
3817      boolean mergeExistingPermissions) {
3818    return this.<Void> newMasterCaller()
3819        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3820          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3821          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3822        .call();
3823  }
3824
3825  @Override
3826  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3827    return this.<Void> newMasterCaller()
3828        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3829          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3830          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3831        .call();
3832  }
3833
3834  @Override
3835  public CompletableFuture<List<UserPermission>>
3836      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3837    return this.<List<UserPermission>> newMasterCaller().action((controller,
3838        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3839            List<UserPermission>> call(controller, stub,
3840              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3841              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3842              resp -> resp.getUserPermissionList().stream()
3843                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3844                .collect(Collectors.toList())))
3845        .call();
3846  }
3847
3848  @Override
3849  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3850      List<Permission> permissions) {
3851    return this.<List<Boolean>> newMasterCaller()
3852        .action((controller, stub) -> this
3853            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3854              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3855              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3856              resp -> resp.getHasUserPermissionList()))
3857        .call();
3858  }
3859}