001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcChannel;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.replication.SyncReplicationState;
089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
090import org.apache.hadoop.hbase.security.access.Permission;
091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
092import org.apache.hadoop.hbase.security.access.UserPermission;
093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.util.Bytes;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
107import org.apache.hbase.thirdparty.io.netty.util.Timeout;
108import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
109
110import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
111import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
210    .IsSnapshotCleanupEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
262    .SetSnapshotCleanupResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
305
306/**
307 * The implementation of AsyncAdmin.
308 * <p>
309 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
310 * be finished inside the rpc framework thread, which means that the callbacks registered to the
311 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
312 * this class should not try to do time consuming tasks in the callbacks.
313 * @since 2.0.0
314 * @see AsyncHBaseAdmin
315 * @see AsyncConnection#getAdmin()
316 * @see AsyncConnection#getAdminBuilder()
317 */
318@InterfaceAudience.Private
319class RawAsyncHBaseAdmin implements AsyncAdmin {
320  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
321
322  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
323
324  private final AsyncConnectionImpl connection;
325
326  private final HashedWheelTimer retryTimer;
327
328  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
329
330  private final long rpcTimeoutNs;
331
332  private final long operationTimeoutNs;
333
334  private final long pauseNs;
335
336  private final long pauseForCQTBENs;
337
338  private final int maxAttempts;
339
340  private final int startLogErrorsCnt;
341
342  private final NonceGenerator ng;
343
344  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
345      AsyncAdminBuilderBase builder) {
346    this.connection = connection;
347    this.retryTimer = retryTimer;
348    this.metaTable = connection.getTable(META_TABLE_NAME);
349    this.rpcTimeoutNs = builder.rpcTimeoutNs;
350    this.operationTimeoutNs = builder.operationTimeoutNs;
351    this.pauseNs = builder.pauseNs;
352    if (builder.pauseForCQTBENs < builder.pauseNs) {
353      LOG.warn(
354        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
355          " the normal pause value {} ms, use the greater one instead",
356        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
357        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
358      this.pauseForCQTBENs = builder.pauseNs;
359    } else {
360      this.pauseForCQTBENs = builder.pauseForCQTBENs;
361    }
362    this.maxAttempts = builder.maxAttempts;
363    this.startLogErrorsCnt = builder.startLogErrorsCnt;
364    this.ng = connection.getNonceGenerator();
365  }
366
367  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
368    return this.connection.callerFactory.<T> masterRequest()
369      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
370      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
371      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
372      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
373  }
374
375  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
376    return this.connection.callerFactory.<T> adminRequest()
377      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
378      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
379      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
380      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
381  }
382
383  @FunctionalInterface
384  private interface MasterRpcCall<RESP, REQ> {
385    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
386        RpcCallback<RESP> done);
387  }
388
389  @FunctionalInterface
390  private interface AdminRpcCall<RESP, REQ> {
391    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
392        RpcCallback<RESP> done);
393  }
394
395  @FunctionalInterface
396  private interface Converter<D, S> {
397    D convert(S src) throws IOException;
398  }
399
400  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
401      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
402      Converter<RESP, PRESP> respConverter) {
403    CompletableFuture<RESP> future = new CompletableFuture<>();
404    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
405
406      @Override
407      public void run(PRESP resp) {
408        if (controller.failed()) {
409          future.completeExceptionally(controller.getFailed());
410        } else {
411          try {
412            future.complete(respConverter.convert(resp));
413          } catch (IOException e) {
414            future.completeExceptionally(e);
415          }
416        }
417      }
418    });
419    return future;
420  }
421
422  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
423      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
424      Converter<RESP, PRESP> respConverter) {
425    CompletableFuture<RESP> future = new CompletableFuture<>();
426    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
427
428      @Override
429      public void run(PRESP resp) {
430        if (controller.failed()) {
431          future.completeExceptionally(controller.getFailed());
432        } else {
433          try {
434            future.complete(respConverter.convert(resp));
435          } catch (IOException e) {
436            future.completeExceptionally(e);
437          }
438        }
439      }
440    });
441    return future;
442  }
443
444  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
445      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
446      ProcedureBiConsumer consumer) {
447    return procedureCall(b -> {
448    }, preq, rpcCall, respConverter, consumer);
449  }
450
451  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
452      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
453      ProcedureBiConsumer consumer) {
454    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
455  }
456
457  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
458      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
459      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
460      ProcedureBiConsumer consumer) {
461    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
462        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
463    prioritySetter.accept(builder);
464    CompletableFuture<Long> procFuture = builder.call();
465    CompletableFuture<Void> future = waitProcedureResult(procFuture);
466    addListener(future, consumer);
467    return future;
468  }
469
470  @FunctionalInterface
471  private interface TableOperator {
472    CompletableFuture<Void> operate(TableName table);
473  }
474
475  @Override
476  public CompletableFuture<Boolean> tableExists(TableName tableName) {
477    if (TableName.isMetaTableName(tableName)) {
478      return CompletableFuture.completedFuture(true);
479    }
480    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
481  }
482
483  @Override
484  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
485    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
486      includeSysTables));
487  }
488
489  /**
490   * {@link #listTableDescriptors(boolean)}
491   */
492  @Override
493  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
494      boolean includeSysTables) {
495    Preconditions.checkNotNull(pattern,
496      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
497    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
498      includeSysTables));
499  }
500
501  @Override
502  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
503    Preconditions.checkNotNull(tableNames,
504      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
505    if (tableNames.isEmpty()) {
506      return CompletableFuture.completedFuture(Collections.emptyList());
507    }
508    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
509  }
510
511  private CompletableFuture<List<TableDescriptor>>
512      getTableDescriptors(GetTableDescriptorsRequest request) {
513    return this.<List<TableDescriptor>> newMasterCaller()
514        .action((controller, stub) -> this
515            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
516              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
517              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
518        .call();
519  }
520
521  @Override
522  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
523    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
524  }
525
526  @Override
527  public CompletableFuture<List<TableName>>
528      listTableNames(Pattern pattern, boolean includeSysTables) {
529    Preconditions.checkNotNull(pattern,
530        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
531    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
532  }
533
534  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
535    return this
536        .<List<TableName>> newMasterCaller()
537        .action(
538          (controller, stub) -> this
539              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
540                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
541                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
542  }
543
544  @Override
545  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
546    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
547        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
548        List<TableDescriptor>> call(
549          controller, stub,
550          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
551          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
552          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
553        .call();
554  }
555
556  @Override
557  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
558    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
559        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
560        List<TableName>> call(
561          controller, stub,
562          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
563          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
564          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
565        .call();
566  }
567
568  @Override
569  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
570    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
571    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
572      .action((controller, stub) -> this
573        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
574          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
575          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
576          (resp) -> resp.getTableSchemaList()))
577      .call(), (tableSchemas, error) -> {
578        if (error != null) {
579          future.completeExceptionally(error);
580          return;
581        }
582        if (!tableSchemas.isEmpty()) {
583          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
584        } else {
585          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
586        }
587      });
588    return future;
589  }
590
591  @Override
592  public CompletableFuture<Void> createTable(TableDescriptor desc) {
593    return createTable(desc.getTableName(),
594      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
595  }
596
597  @Override
598  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
599      int numRegions) {
600    try {
601      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
602    } catch (IllegalArgumentException e) {
603      return failedFuture(e);
604    }
605  }
606
607  @Override
608  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
609    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
610        + " use createTable(TableDescriptor) instead");
611    try {
612      verifySplitKeys(splitKeys);
613      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
614        splitKeys, ng.getNonceGroup(), ng.newNonce()));
615    } catch (IllegalArgumentException e) {
616      return failedFuture(e);
617    }
618  }
619
620  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
621    Preconditions.checkNotNull(tableName, "table name is null");
622    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
623      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
624      new CreateTableProcedureBiConsumer(tableName));
625  }
626
627  @Override
628  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
629    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
630      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
631        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
632      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
633  }
634
635  @Override
636  public CompletableFuture<Void> deleteTable(TableName tableName) {
637    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
638      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
639      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
640      new DeleteTableProcedureBiConsumer(tableName));
641  }
642
643  @Override
644  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
645    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
646      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
647        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
648      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
649  }
650
651  @Override
652  public CompletableFuture<Void> enableTable(TableName tableName) {
653    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
654      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
655      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
656      new EnableTableProcedureBiConsumer(tableName));
657  }
658
659  @Override
660  public CompletableFuture<Void> disableTable(TableName tableName) {
661    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
662      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
663      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
664      new DisableTableProcedureBiConsumer(tableName));
665  }
666
667  @Override
668  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
669    if (TableName.isMetaTableName(tableName)) {
670      return CompletableFuture.completedFuture(true);
671    }
672    CompletableFuture<Boolean> future = new CompletableFuture<>();
673    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
674      if (error != null) {
675        future.completeExceptionally(error);
676        return;
677      }
678      if (state.isPresent()) {
679        future.complete(state.get().inStates(TableState.State.ENABLED));
680      } else {
681        future.completeExceptionally(new TableNotFoundException(tableName));
682      }
683    });
684    return future;
685  }
686
687  @Override
688  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
689    if (TableName.isMetaTableName(tableName)) {
690      return CompletableFuture.completedFuture(false);
691    }
692    CompletableFuture<Boolean> future = new CompletableFuture<>();
693    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
694      if (error != null) {
695        future.completeExceptionally(error);
696        return;
697      }
698      if (state.isPresent()) {
699        future.complete(state.get().inStates(TableState.State.DISABLED));
700      } else {
701        future.completeExceptionally(new TableNotFoundException(tableName));
702      }
703    });
704    return future;
705  }
706
707  @Override
708  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
709    if (TableName.isMetaTableName(tableName)) {
710      return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
711        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
712    }
713    CompletableFuture<Boolean> future = new CompletableFuture<>();
714    addListener(isTableEnabled(tableName), (enabled, error) -> {
715      if (error != null) {
716        if (error instanceof TableNotFoundException) {
717          future.complete(false);
718        } else {
719          future.completeExceptionally(error);
720        }
721        return;
722      }
723      if (!enabled) {
724        future.complete(false);
725      } else {
726        addListener(
727          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
728          (locations, error1) -> {
729            if (error1 != null) {
730              future.completeExceptionally(error1);
731              return;
732            }
733            List<HRegionLocation> notDeployedRegions = locations.stream()
734              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
735            if (notDeployedRegions.size() > 0) {
736              if (LOG.isDebugEnabled()) {
737                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
738              }
739              future.complete(false);
740              return;
741            }
742            future.complete(true);
743          });
744      }
745    });
746    return future;
747  }
748
749  @Override
750  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
751    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
752      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
753        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
754      new AddColumnFamilyProcedureBiConsumer(tableName));
755  }
756
757  @Override
758  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
759    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
760      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
761        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
762      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
763  }
764
765  @Override
766  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
767      ColumnFamilyDescriptor columnFamily) {
768    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
769      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
770        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
771      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
772  }
773
774  @Override
775  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
776    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
777      RequestConverter.buildCreateNamespaceRequest(descriptor),
778      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
779      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
780  }
781
782  @Override
783  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
784    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
785      RequestConverter.buildModifyNamespaceRequest(descriptor),
786      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
787      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
788  }
789
790  @Override
791  public CompletableFuture<Void> deleteNamespace(String name) {
792    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
793      RequestConverter.buildDeleteNamespaceRequest(name),
794      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
795      new DeleteNamespaceProcedureBiConsumer(name));
796  }
797
798  @Override
799  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
800    return this
801        .<NamespaceDescriptor> newMasterCaller()
802        .action(
803          (controller, stub) -> this
804              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
805                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
806                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
807                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
808  }
809
810  @Override
811  public CompletableFuture<List<String>> listNamespaces() {
812    return this
813        .<List<String>> newMasterCaller()
814        .action(
815          (controller, stub) -> this
816              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
817                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
818                  done) -> s.listNamespaces(c, req, done),
819                (resp) -> resp.getNamespaceNameList())).call();
820  }
821
822  @Override
823  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
824    return this
825        .<List<NamespaceDescriptor>> newMasterCaller()
826        .action(
827          (controller, stub) -> this
828              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
829                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
830                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
831                    .toNamespaceDescriptorList(resp))).call();
832  }
833
834  @Override
835  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
836    return this.<List<RegionInfo>> newAdminCaller()
837        .action((controller, stub) -> this
838            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
839              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
840              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
841              resp -> ProtobufUtil.getRegionInfos(resp)))
842        .serverName(serverName).call();
843  }
844
845  @Override
846  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
847    if (tableName.equals(META_TABLE_NAME)) {
848      return connection.registry.getMetaRegionLocation()
849        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
850          .collect(Collectors.toList()));
851    } else {
852      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
853        .thenApply(
854          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
855    }
856  }
857
858  @Override
859  public CompletableFuture<Void> flush(TableName tableName) {
860    CompletableFuture<Void> future = new CompletableFuture<>();
861    addListener(tableExists(tableName), (exists, err) -> {
862      if (err != null) {
863        future.completeExceptionally(err);
864      } else if (!exists) {
865        future.completeExceptionally(new TableNotFoundException(tableName));
866      } else {
867        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
868          if (err2 != null) {
869            future.completeExceptionally(err2);
870          } else if (!tableEnabled) {
871            future.completeExceptionally(new TableNotEnabledException(tableName));
872          } else {
873            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
874              new HashMap<>()), (ret, err3) -> {
875                if (err3 != null) {
876                  future.completeExceptionally(err3);
877                } else {
878                  future.complete(ret);
879                }
880              });
881          }
882        });
883      }
884    });
885    return future;
886  }
887
888  @Override
889  public CompletableFuture<Void> flushRegion(byte[] regionName) {
890    return flushRegionInternal(regionName, false).thenAccept(r -> {
891    });
892  }
893
894  /**
895   * This method is for internal use only, where we need the response of the flush.
896   * <p/>
897   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
898   * API.
899   */
900  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
901      boolean writeFlushWALMarker) {
902    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
903    addListener(getRegionLocation(regionName), (location, err) -> {
904      if (err != null) {
905        future.completeExceptionally(err);
906        return;
907      }
908      ServerName serverName = location.getServerName();
909      if (serverName == null) {
910        future
911          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
912        return;
913      }
914      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
915        if (err2 != null) {
916          future.completeExceptionally(err2);
917        } else {
918          future.complete(ret);
919        }
920      });
921    });
922    return future;
923  }
924
925  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
926      boolean writeFlushWALMarker) {
927    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
928      .action((controller, stub) -> this
929        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
930          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
931          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
932      .call();
933  }
934
935  @Override
936  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
937    CompletableFuture<Void> future = new CompletableFuture<>();
938    addListener(getRegions(sn), (hRegionInfos, err) -> {
939      if (err != null) {
940        future.completeExceptionally(err);
941        return;
942      }
943      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
944      if (hRegionInfos != null) {
945        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
946        })));
947      }
948      addListener(CompletableFuture.allOf(
949        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
950          if (err2 != null) {
951            future.completeExceptionally(err2);
952          } else {
953            future.complete(ret);
954          }
955        });
956    });
957    return future;
958  }
959
960  @Override
961  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
962    return compact(tableName, null, false, compactType);
963  }
964
965  @Override
966  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
967      CompactType compactType) {
968    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
969        + "If you don't specify a columnFamily, use compact(TableName) instead");
970    return compact(tableName, columnFamily, false, compactType);
971  }
972
973  @Override
974  public CompletableFuture<Void> compactRegion(byte[] regionName) {
975    return compactRegion(regionName, null, false);
976  }
977
978  @Override
979  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
980    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
981        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
982    return compactRegion(regionName, columnFamily, false);
983  }
984
985  @Override
986  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
987    return compact(tableName, null, true, compactType);
988  }
989
990  @Override
991  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
992      CompactType compactType) {
993    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
994        + "If you don't specify a columnFamily, use compact(TableName) instead");
995    return compact(tableName, columnFamily, true, compactType);
996  }
997
998  @Override
999  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1000    return compactRegion(regionName, null, true);
1001  }
1002
1003  @Override
1004  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1005    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1006        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1007    return compactRegion(regionName, columnFamily, true);
1008  }
1009
1010  @Override
1011  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1012    return compactRegionServer(sn, false);
1013  }
1014
1015  @Override
1016  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1017    return compactRegionServer(sn, true);
1018  }
1019
1020  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1021    CompletableFuture<Void> future = new CompletableFuture<>();
1022    addListener(getRegions(sn), (hRegionInfos, err) -> {
1023      if (err != null) {
1024        future.completeExceptionally(err);
1025        return;
1026      }
1027      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1028      if (hRegionInfos != null) {
1029        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1030      }
1031      addListener(CompletableFuture.allOf(
1032        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1033          if (err2 != null) {
1034            future.completeExceptionally(err2);
1035          } else {
1036            future.complete(ret);
1037          }
1038        });
1039    });
1040    return future;
1041  }
1042
1043  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1044      boolean major) {
1045    CompletableFuture<Void> future = new CompletableFuture<>();
1046    addListener(getRegionLocation(regionName), (location, err) -> {
1047      if (err != null) {
1048        future.completeExceptionally(err);
1049        return;
1050      }
1051      ServerName serverName = location.getServerName();
1052      if (serverName == null) {
1053        future
1054          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1055        return;
1056      }
1057      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1058        (ret, err2) -> {
1059          if (err2 != null) {
1060            future.completeExceptionally(err2);
1061          } else {
1062            future.complete(ret);
1063          }
1064        });
1065    });
1066    return future;
1067  }
1068
1069  /**
1070   * List all region locations for the specific table.
1071   */
1072  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1073    if (TableName.META_TABLE_NAME.equals(tableName)) {
1074      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1075      // For meta table, we use zk to fetch all locations.
1076      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1077      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1078        if (err != null) {
1079          future.completeExceptionally(err);
1080        } else if (metaRegions == null || metaRegions.isEmpty() ||
1081          metaRegions.getDefaultRegionLocation() == null) {
1082          future.completeExceptionally(new IOException("meta region does not found"));
1083        } else {
1084          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1085        }
1086        // close the registry.
1087        IOUtils.closeQuietly(registry);
1088      });
1089      return future;
1090    } else {
1091      // For non-meta table, we fetch all locations by scanning hbase:meta table
1092      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
1093    }
1094  }
1095
1096  /**
1097   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1098   */
1099  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1100      CompactType compactType) {
1101    CompletableFuture<Void> future = new CompletableFuture<>();
1102
1103    switch (compactType) {
1104      case MOB:
1105        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1106          if (err != null) {
1107            future.completeExceptionally(err);
1108            return;
1109          }
1110          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1111          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1112            if (err2 != null) {
1113              future.completeExceptionally(err2);
1114            } else {
1115              future.complete(ret);
1116            }
1117          });
1118        });
1119        break;
1120      case NORMAL:
1121        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1122          if (err != null) {
1123            future.completeExceptionally(err);
1124            return;
1125          }
1126          if (locations == null || locations.isEmpty()) {
1127            future.completeExceptionally(new TableNotFoundException(tableName));
1128          }
1129          CompletableFuture<?>[] compactFutures =
1130            locations.stream().filter(l -> l.getRegion() != null)
1131              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1132              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1133              .toArray(CompletableFuture<?>[]::new);
1134          // future complete unless all of the compact futures are completed.
1135          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1136            if (err2 != null) {
1137              future.completeExceptionally(err2);
1138            } else {
1139              future.complete(ret);
1140            }
1141          });
1142        });
1143        break;
1144      default:
1145        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1146    }
1147    return future;
1148  }
1149
1150  /**
1151   * Compact the region at specific region server.
1152   */
1153  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1154      final boolean major, byte[] columnFamily) {
1155    return this
1156        .<Void> newAdminCaller()
1157        .serverName(sn)
1158        .action(
1159          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1160            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1161              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1162            resp -> null)).call();
1163  }
1164
1165  private byte[] toEncodeRegionName(byte[] regionName) {
1166    try {
1167      return RegionInfo.isEncodedRegionName(regionName) ? regionName
1168          : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1169    } catch (IOException e) {
1170      return regionName;
1171    }
1172  }
1173
1174  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1175      CompletableFuture<TableName> result) {
1176    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1177      if (err != null) {
1178        result.completeExceptionally(err);
1179        return;
1180      }
1181      RegionInfo regionInfo = location.getRegion();
1182      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1183        result.completeExceptionally(
1184          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1185        return;
1186      }
1187      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1188        if (!tableName.get().equals(regionInfo.getTable())) {
1189          // tables of this two region should be same.
1190          result.completeExceptionally(
1191            new IllegalArgumentException("Cannot merge regions from two different tables " +
1192              tableName.get() + " and " + regionInfo.getTable()));
1193        } else {
1194          result.complete(tableName.get());
1195        }
1196      }
1197    });
1198  }
1199
1200  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1201    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1202    CompletableFuture<TableName> future = new CompletableFuture<>();
1203    for (byte[] encodedRegionName : encodedRegionNames) {
1204      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1205    }
1206    return future;
1207  }
1208
1209  @Override
1210  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1211    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1212  }
1213
1214  @Override
1215  public CompletableFuture<Boolean> isMergeEnabled() {
1216    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1217  }
1218
1219  @Override
1220  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1221    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1222  }
1223
1224  @Override
1225  public CompletableFuture<Boolean> isSplitEnabled() {
1226    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1227  }
1228
1229  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1230      MasterSwitchType switchType) {
1231    SetSplitOrMergeEnabledRequest request =
1232      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1233    return this.<Boolean> newMasterCaller()
1234      .action((controller, stub) -> this
1235        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1236          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1237          (resp) -> resp.getPrevValueList().get(0)))
1238      .call();
1239  }
1240
1241  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1242    IsSplitOrMergeEnabledRequest request =
1243        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1244    return this
1245        .<Boolean> newMasterCaller()
1246        .action(
1247          (controller, stub) -> this
1248              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1249                controller, stub, request,
1250                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1251                (resp) -> resp.getEnabled())).call();
1252  }
1253
1254  @Override
1255  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1256    if (nameOfRegionsToMerge.size() < 2) {
1257      return failedFuture(new IllegalArgumentException(
1258        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1259    }
1260    CompletableFuture<Void> future = new CompletableFuture<>();
1261    byte[][] encodedNameOfRegionsToMerge =
1262      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1263
1264    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1265      if (err != null) {
1266        future.completeExceptionally(err);
1267        return;
1268      }
1269
1270      MergeTableRegionsRequest request = null;
1271      try {
1272        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1273          forcible, ng.getNonceGroup(), ng.newNonce());
1274      } catch (DeserializationException e) {
1275        future.completeExceptionally(e);
1276        return;
1277      }
1278
1279      addListener(
1280        this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
1281          (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1282          new MergeTableRegionProcedureBiConsumer(tableName)),
1283        (ret, err2) -> {
1284          if (err2 != null) {
1285            future.completeExceptionally(err2);
1286          } else {
1287            future.complete(ret);
1288          }
1289        });
1290    });
1291    return future;
1292  }
1293
1294  @Override
1295  public CompletableFuture<Void> split(TableName tableName) {
1296    CompletableFuture<Void> future = new CompletableFuture<>();
1297    addListener(tableExists(tableName), (exist, error) -> {
1298      if (error != null) {
1299        future.completeExceptionally(error);
1300        return;
1301      }
1302      if (!exist) {
1303        future.completeExceptionally(new TableNotFoundException(tableName));
1304        return;
1305      }
1306      addListener(
1307        metaTable
1308          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1309            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1310            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1311        (results, err2) -> {
1312          if (err2 != null) {
1313            future.completeExceptionally(err2);
1314            return;
1315          }
1316          if (results != null && !results.isEmpty()) {
1317            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1318            for (Result r : results) {
1319              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1320                continue;
1321              }
1322              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1323              if (rl != null) {
1324                for (HRegionLocation h : rl.getRegionLocations()) {
1325                  if (h != null && h.getServerName() != null) {
1326                    RegionInfo hri = h.getRegion();
1327                    if (hri == null || hri.isSplitParent() ||
1328                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1329                      continue;
1330                    }
1331                    splitFutures.add(split(hri, null));
1332                  }
1333                }
1334              }
1335            }
1336            addListener(
1337              CompletableFuture
1338                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1339              (ret, exception) -> {
1340                if (exception != null) {
1341                  future.completeExceptionally(exception);
1342                  return;
1343                }
1344                future.complete(ret);
1345              });
1346          } else {
1347            future.complete(null);
1348          }
1349        });
1350    });
1351    return future;
1352  }
1353
1354  @Override
1355  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1356    CompletableFuture<Void> result = new CompletableFuture<>();
1357    if (splitPoint == null) {
1358      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1359    }
1360    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1361      (loc, err) -> {
1362        if (err != null) {
1363          result.completeExceptionally(err);
1364        } else if (loc == null || loc.getRegion() == null) {
1365          result.completeExceptionally(new IllegalArgumentException(
1366            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1367        } else {
1368          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1369            if (err2 != null) {
1370              result.completeExceptionally(err2);
1371            } else {
1372              result.complete(ret);
1373            }
1374
1375          });
1376        }
1377      });
1378    return result;
1379  }
1380
1381  @Override
1382  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1383    CompletableFuture<Void> future = new CompletableFuture<>();
1384    addListener(getRegionLocation(regionName), (location, err) -> {
1385      if (err != null) {
1386        future.completeExceptionally(err);
1387        return;
1388      }
1389      RegionInfo regionInfo = location.getRegion();
1390      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1391        future
1392          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1393            "Replicas are auto-split when their primary is split."));
1394        return;
1395      }
1396      ServerName serverName = location.getServerName();
1397      if (serverName == null) {
1398        future
1399          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1400        return;
1401      }
1402      addListener(split(regionInfo, null), (ret, err2) -> {
1403        if (err2 != null) {
1404          future.completeExceptionally(err2);
1405        } else {
1406          future.complete(ret);
1407        }
1408      });
1409    });
1410    return future;
1411  }
1412
1413  @Override
1414  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1415    Preconditions.checkNotNull(splitPoint,
1416      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1417    CompletableFuture<Void> future = new CompletableFuture<>();
1418    addListener(getRegionLocation(regionName), (location, err) -> {
1419      if (err != null) {
1420        future.completeExceptionally(err);
1421        return;
1422      }
1423      RegionInfo regionInfo = location.getRegion();
1424      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1425        future
1426          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1427            "Replicas are auto-split when their primary is split."));
1428        return;
1429      }
1430      ServerName serverName = location.getServerName();
1431      if (serverName == null) {
1432        future
1433          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1434        return;
1435      }
1436      if (regionInfo.getStartKey() != null &&
1437        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1438        future.completeExceptionally(
1439          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1440        return;
1441      }
1442      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1443        if (err2 != null) {
1444          future.completeExceptionally(err2);
1445        } else {
1446          future.complete(ret);
1447        }
1448      });
1449    });
1450    return future;
1451  }
1452
1453  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1454    CompletableFuture<Void> future = new CompletableFuture<>();
1455    TableName tableName = hri.getTable();
1456    SplitTableRegionRequest request = null;
1457    try {
1458      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1459        ng.newNonce());
1460    } catch (DeserializationException e) {
1461      future.completeExceptionally(e);
1462      return future;
1463    }
1464
1465    addListener(
1466      this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
1467        request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1468        new SplitTableRegionProcedureBiConsumer(tableName)),
1469      (ret, err2) -> {
1470        if (err2 != null) {
1471          future.completeExceptionally(err2);
1472        } else {
1473          future.complete(ret);
1474        }
1475      });
1476    return future;
1477  }
1478
1479  @Override
1480  public CompletableFuture<Void> assign(byte[] regionName) {
1481    CompletableFuture<Void> future = new CompletableFuture<>();
1482    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1483      if (err != null) {
1484        future.completeExceptionally(err);
1485        return;
1486      }
1487      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1488        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1489          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1490          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1491        .call(), (ret, err2) -> {
1492          if (err2 != null) {
1493            future.completeExceptionally(err2);
1494          } else {
1495            future.complete(ret);
1496          }
1497        });
1498    });
1499    return future;
1500  }
1501
1502  @Override
1503  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1504    CompletableFuture<Void> future = new CompletableFuture<>();
1505    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1506      if (err != null) {
1507        future.completeExceptionally(err);
1508        return;
1509      }
1510      addListener(
1511        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1512          .action(((controller, stub) -> this
1513            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1514              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1515              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1516          .call(),
1517        (ret, err2) -> {
1518          if (err2 != null) {
1519            future.completeExceptionally(err2);
1520          } else {
1521            future.complete(ret);
1522          }
1523        });
1524    });
1525    return future;
1526  }
1527
1528  @Override
1529  public CompletableFuture<Void> offline(byte[] regionName) {
1530    CompletableFuture<Void> future = new CompletableFuture<>();
1531    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1532      if (err != null) {
1533        future.completeExceptionally(err);
1534        return;
1535      }
1536      addListener(
1537        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1538          .action(((controller, stub) -> this
1539            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1540              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1541              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1542          .call(),
1543        (ret, err2) -> {
1544          if (err2 != null) {
1545            future.completeExceptionally(err2);
1546          } else {
1547            future.complete(ret);
1548          }
1549        });
1550    });
1551    return future;
1552  }
1553
1554  @Override
1555  public CompletableFuture<Void> move(byte[] regionName) {
1556    CompletableFuture<Void> future = new CompletableFuture<>();
1557    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1558      if (err != null) {
1559        future.completeExceptionally(err);
1560        return;
1561      }
1562      addListener(
1563        moveRegion(regionInfo,
1564          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1565        (ret, err2) -> {
1566          if (err2 != null) {
1567            future.completeExceptionally(err2);
1568          } else {
1569            future.complete(ret);
1570          }
1571        });
1572    });
1573    return future;
1574  }
1575
1576  @Override
1577  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1578    Preconditions.checkNotNull(destServerName,
1579      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1580    CompletableFuture<Void> future = new CompletableFuture<>();
1581    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1582      if (err != null) {
1583        future.completeExceptionally(err);
1584        return;
1585      }
1586      addListener(
1587        moveRegion(regionInfo, RequestConverter
1588          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1589        (ret, err2) -> {
1590          if (err2 != null) {
1591            future.completeExceptionally(err2);
1592          } else {
1593            future.complete(ret);
1594          }
1595        });
1596    });
1597    return future;
1598  }
1599
1600  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1601    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1602      .action(
1603        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1604          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1605      .call();
1606  }
1607
1608  @Override
1609  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1610    return this
1611        .<Void> newMasterCaller()
1612        .action(
1613          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1614            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1615            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1616  }
1617
1618  @Override
1619  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1620    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1621    Scan scan = QuotaTableUtil.makeScan(filter);
1622    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1623        .scan(scan, new AdvancedScanResultConsumer() {
1624          List<QuotaSettings> settings = new ArrayList<>();
1625
1626          @Override
1627          public void onNext(Result[] results, ScanController controller) {
1628            for (Result result : results) {
1629              try {
1630                QuotaTableUtil.parseResultToCollection(result, settings);
1631              } catch (IOException e) {
1632                controller.terminate();
1633                future.completeExceptionally(e);
1634              }
1635            }
1636          }
1637
1638          @Override
1639          public void onError(Throwable error) {
1640            future.completeExceptionally(error);
1641          }
1642
1643          @Override
1644          public void onComplete() {
1645            future.complete(settings);
1646          }
1647        });
1648    return future;
1649  }
1650
1651  @Override
1652  public CompletableFuture<Void> addReplicationPeer(String peerId,
1653      ReplicationPeerConfig peerConfig, boolean enabled) {
1654    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1655      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1656      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1657      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1658  }
1659
1660  @Override
1661  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1662    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1663      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1664      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1665      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1666  }
1667
1668  @Override
1669  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1670    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1671      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1672      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1673      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1674  }
1675
1676  @Override
1677  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1678    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1679      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1680      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1681      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1682  }
1683
1684  @Override
1685  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1686    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1687      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1688        controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1689        (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1690        (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1691      .call();
1692  }
1693
1694  @Override
1695  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1696      ReplicationPeerConfig peerConfig) {
1697    return this
1698      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1699        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1700        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1701        (resp) -> resp.getProcId(),
1702        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1703  }
1704
1705  @Override
1706  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1707      SyncReplicationState clusterState) {
1708    return this
1709      .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1710        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1711          clusterState),
1712        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1713        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1714          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1715  }
1716
1717  @Override
1718  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1719      Map<TableName, List<String>> tableCfs) {
1720    if (tableCfs == null) {
1721      return failedFuture(new ReplicationException("tableCfs is null"));
1722    }
1723
1724    CompletableFuture<Void> future = new CompletableFuture<Void>();
1725    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1726      if (!completeExceptionally(future, error)) {
1727        ReplicationPeerConfig newPeerConfig =
1728          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1729        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1730          if (!completeExceptionally(future, error)) {
1731            future.complete(result);
1732          }
1733        });
1734      }
1735    });
1736    return future;
1737  }
1738
1739  @Override
1740  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1741      Map<TableName, List<String>> tableCfs) {
1742    if (tableCfs == null) {
1743      return failedFuture(new ReplicationException("tableCfs is null"));
1744    }
1745
1746    CompletableFuture<Void> future = new CompletableFuture<Void>();
1747    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1748      if (!completeExceptionally(future, error)) {
1749        ReplicationPeerConfig newPeerConfig = null;
1750        try {
1751          newPeerConfig = ReplicationPeerConfigUtil
1752            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1753        } catch (ReplicationException e) {
1754          future.completeExceptionally(e);
1755          return;
1756        }
1757        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1758          if (!completeExceptionally(future, error)) {
1759            future.complete(result);
1760          }
1761        });
1762      }
1763    });
1764    return future;
1765  }
1766
1767  @Override
1768  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1769    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1770  }
1771
1772  @Override
1773  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1774    Preconditions.checkNotNull(pattern,
1775      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1776    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1777  }
1778
1779  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1780      ListReplicationPeersRequest request) {
1781    return this
1782        .<List<ReplicationPeerDescription>> newMasterCaller()
1783        .action(
1784          (controller, stub) -> this
1785              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1786                controller,
1787                stub,
1788                request,
1789                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1790                (resp) -> resp.getPeerDescList().stream()
1791                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1792                    .collect(Collectors.toList()))).call();
1793  }
1794
1795  @Override
1796  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1797    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1798    addListener(listTableDescriptors(), (tables, error) -> {
1799      if (!completeExceptionally(future, error)) {
1800        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1801        tables.forEach(table -> {
1802          Map<String, Integer> cfs = new HashMap<>();
1803          Stream.of(table.getColumnFamilies())
1804            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1805            .forEach(column -> {
1806              cfs.put(column.getNameAsString(), column.getScope());
1807            });
1808          if (!cfs.isEmpty()) {
1809            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1810          }
1811        });
1812        future.complete(replicatedTableCFs);
1813      }
1814    });
1815    return future;
1816  }
1817
1818  @Override
1819  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1820    SnapshotProtos.SnapshotDescription snapshot =
1821      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1822    try {
1823      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1824    } catch (IllegalArgumentException e) {
1825      return failedFuture(e);
1826    }
1827    CompletableFuture<Void> future = new CompletableFuture<>();
1828    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1829    addListener(this.<Long> newMasterCaller()
1830      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1831        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1832        resp -> resp.getExpectedTimeout()))
1833      .call(), (expectedTimeout, err) -> {
1834        if (err != null) {
1835          future.completeExceptionally(err);
1836          return;
1837        }
1838        TimerTask pollingTask = new TimerTask() {
1839          int tries = 0;
1840          long startTime = EnvironmentEdgeManager.currentTime();
1841          long endTime = startTime + expectedTimeout;
1842          long maxPauseTime = expectedTimeout / maxAttempts;
1843
1844          @Override
1845          public void run(Timeout timeout) throws Exception {
1846            if (EnvironmentEdgeManager.currentTime() < endTime) {
1847              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1848                if (err2 != null) {
1849                  future.completeExceptionally(err2);
1850                } else if (done) {
1851                  future.complete(null);
1852                } else {
1853                  // retry again after pauseTime.
1854                  long pauseTime =
1855                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1856                  pauseTime = Math.min(pauseTime, maxPauseTime);
1857                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1858                    TimeUnit.MILLISECONDS);
1859                }
1860              });
1861            } else {
1862              future.completeExceptionally(
1863                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1864                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1865            }
1866          }
1867        };
1868        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1869      });
1870    return future;
1871  }
1872
1873  @Override
1874  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1875    return this
1876        .<Boolean> newMasterCaller()
1877        .action(
1878          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1879            controller,
1880            stub,
1881            IsSnapshotDoneRequest.newBuilder()
1882                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1883                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1884  }
1885
1886  @Override
1887  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1888    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1889      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1890      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1891    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1892  }
1893
1894  @Override
1895  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1896      boolean restoreAcl) {
1897    CompletableFuture<Void> future = new CompletableFuture<>();
1898    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1899      if (err != null) {
1900        future.completeExceptionally(err);
1901        return;
1902      }
1903      TableName tableName = null;
1904      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1905        for (SnapshotDescription snap : snapshotDescriptions) {
1906          if (snap.getName().equals(snapshotName)) {
1907            tableName = snap.getTableName();
1908            break;
1909          }
1910        }
1911      }
1912      if (tableName == null) {
1913        future.completeExceptionally(new RestoreSnapshotException(
1914          "Unable to find the table name for snapshot=" + snapshotName));
1915        return;
1916      }
1917      final TableName finalTableName = tableName;
1918      addListener(tableExists(finalTableName), (exists, err2) -> {
1919        if (err2 != null) {
1920          future.completeExceptionally(err2);
1921        } else if (!exists) {
1922          // if table does not exist, then just clone snapshot into new table.
1923          completeConditionalOnFuture(future,
1924            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1925        } else {
1926          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1927            if (err4 != null) {
1928              future.completeExceptionally(err4);
1929            } else if (!disabled) {
1930              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1931            } else {
1932              completeConditionalOnFuture(future,
1933                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1934            }
1935          });
1936        }
1937      });
1938    });
1939    return future;
1940  }
1941
1942  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1943      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1944    if (takeFailSafeSnapshot) {
1945      CompletableFuture<Void> future = new CompletableFuture<>();
1946      // Step.1 Take a snapshot of the current state
1947      String failSafeSnapshotSnapshotNameFormat =
1948        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1949          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1950      final String failSafeSnapshotSnapshotName =
1951        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1952          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1953          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1954      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1955      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1956        if (err != null) {
1957          future.completeExceptionally(err);
1958        } else {
1959          // Step.2 Restore snapshot
1960          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1961            (void2, err2) -> {
1962              if (err2 != null) {
1963                // Step.3.a Something went wrong during the restore and try to rollback.
1964                addListener(
1965                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1966                  (void3, err3) -> {
1967                    if (err3 != null) {
1968                      future.completeExceptionally(err3);
1969                    } else {
1970                      String msg =
1971                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1972                          failSafeSnapshotSnapshotName + " succeeded.";
1973                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1974                    }
1975                  });
1976              } else {
1977                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1978                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1979                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1980                  if (err3 != null) {
1981                    LOG.error(
1982                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1983                      err3);
1984                  }
1985                  future.complete(ret3);
1986                });
1987              }
1988            });
1989        }
1990      });
1991      return future;
1992    } else {
1993      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
1994    }
1995  }
1996
1997  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
1998      CompletableFuture<T> parentFuture) {
1999    addListener(parentFuture, (res, err) -> {
2000      if (err != null) {
2001        dependentFuture.completeExceptionally(err);
2002      } else {
2003        dependentFuture.complete(res);
2004      }
2005    });
2006  }
2007
2008  @Override
2009  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2010      boolean restoreAcl) {
2011    CompletableFuture<Void> future = new CompletableFuture<>();
2012    addListener(tableExists(tableName), (exists, err) -> {
2013      if (err != null) {
2014        future.completeExceptionally(err);
2015      } else if (exists) {
2016        future.completeExceptionally(new TableExistsException(tableName));
2017      } else {
2018        completeConditionalOnFuture(future,
2019          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2020      }
2021    });
2022    return future;
2023  }
2024
2025  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2026      boolean restoreAcl) {
2027    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2028      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2029    try {
2030      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2031    } catch (IllegalArgumentException e) {
2032      return failedFuture(e);
2033    }
2034    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2035      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2036        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2037          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2038        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2039      .call());
2040  }
2041
2042  @Override
2043  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2044    return getCompletedSnapshots(null);
2045  }
2046
2047  @Override
2048  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2049    Preconditions.checkNotNull(pattern,
2050      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2051    return getCompletedSnapshots(pattern);
2052  }
2053
2054  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2055    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2056        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2057        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2058          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2059          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2060        .call();
2061  }
2062
2063  @Override
2064  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2065    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2066        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2067    return getCompletedSnapshots(tableNamePattern, null);
2068  }
2069
2070  @Override
2071  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2072      Pattern snapshotNamePattern) {
2073    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2074        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2075    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2076        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2077    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2078  }
2079
2080  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2081      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2082    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2083    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2084      if (err != null) {
2085        future.completeExceptionally(err);
2086        return;
2087      }
2088      if (tableNames == null || tableNames.size() <= 0) {
2089        future.complete(Collections.emptyList());
2090        return;
2091      }
2092      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2093        if (err2 != null) {
2094          future.completeExceptionally(err2);
2095          return;
2096        }
2097        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2098          future.complete(Collections.emptyList());
2099          return;
2100        }
2101        future.complete(snapshotDescList.stream()
2102          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2103          .collect(Collectors.toList()));
2104      });
2105    });
2106    return future;
2107  }
2108
2109  @Override
2110  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2111    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2112  }
2113
2114  @Override
2115  public CompletableFuture<Void> deleteSnapshots() {
2116    return internalDeleteSnapshots(null, null);
2117  }
2118
2119  @Override
2120  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2121    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2122        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2123    return internalDeleteSnapshots(null, snapshotNamePattern);
2124  }
2125
2126  @Override
2127  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2128    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2129        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2130    return internalDeleteSnapshots(tableNamePattern, null);
2131  }
2132
2133  @Override
2134  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2135      Pattern snapshotNamePattern) {
2136    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2137        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2138    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2139        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2140    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2141  }
2142
2143  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2144      Pattern snapshotNamePattern) {
2145    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2146    if (tableNamePattern == null) {
2147      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2148    } else {
2149      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2150    }
2151    CompletableFuture<Void> future = new CompletableFuture<>();
2152    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2153      if (err != null) {
2154        future.completeExceptionally(err);
2155        return;
2156      }
2157      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2158        future.complete(null);
2159        return;
2160      }
2161      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2162        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2163          if (e != null) {
2164            future.completeExceptionally(e);
2165          } else {
2166            future.complete(v);
2167          }
2168        });
2169    }));
2170    return future;
2171  }
2172
2173  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2174    return this
2175        .<Void> newMasterCaller()
2176        .action(
2177          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2178            controller,
2179            stub,
2180            DeleteSnapshotRequest.newBuilder()
2181                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2182                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2183  }
2184
2185  @Override
2186  public CompletableFuture<Void> execProcedure(String signature, String instance,
2187      Map<String, String> props) {
2188    CompletableFuture<Void> future = new CompletableFuture<>();
2189    ProcedureDescription procDesc =
2190      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2191    addListener(this.<Long> newMasterCaller()
2192      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2193        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2194        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2195      .call(), (expectedTimeout, err) -> {
2196        if (err != null) {
2197          future.completeExceptionally(err);
2198          return;
2199        }
2200        TimerTask pollingTask = new TimerTask() {
2201          int tries = 0;
2202          long startTime = EnvironmentEdgeManager.currentTime();
2203          long endTime = startTime + expectedTimeout;
2204          long maxPauseTime = expectedTimeout / maxAttempts;
2205
2206          @Override
2207          public void run(Timeout timeout) throws Exception {
2208            if (EnvironmentEdgeManager.currentTime() < endTime) {
2209              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2210                if (err2 != null) {
2211                  future.completeExceptionally(err2);
2212                  return;
2213                }
2214                if (done) {
2215                  future.complete(null);
2216                } else {
2217                  // retry again after pauseTime.
2218                  long pauseTime =
2219                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2220                  pauseTime = Math.min(pauseTime, maxPauseTime);
2221                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2222                    TimeUnit.MICROSECONDS);
2223                }
2224              });
2225            } else {
2226              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2227                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2228            }
2229          }
2230        };
2231        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2232        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2233      });
2234    return future;
2235  }
2236
2237  @Override
2238  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2239      Map<String, String> props) {
2240    ProcedureDescription proDesc =
2241        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2242    return this.<byte[]> newMasterCaller()
2243        .action(
2244          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2245            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2246            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2247            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2248        .call();
2249  }
2250
2251  @Override
2252  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2253      Map<String, String> props) {
2254    ProcedureDescription proDesc =
2255        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2256    return this.<Boolean> newMasterCaller()
2257        .action((controller, stub) -> this
2258            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2259              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2260              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2261        .call();
2262  }
2263
2264  @Override
2265  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2266    return this.<Boolean> newMasterCaller().action(
2267      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2268        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2269        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2270        .call();
2271  }
2272
2273  @Override
2274  public CompletableFuture<String> getProcedures() {
2275    return this
2276        .<String> newMasterCaller()
2277        .action(
2278          (controller, stub) -> this
2279              .<GetProceduresRequest, GetProceduresResponse, String> call(
2280                controller, stub, GetProceduresRequest.newBuilder().build(),
2281                (s, c, req, done) -> s.getProcedures(c, req, done),
2282                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2283  }
2284
2285  @Override
2286  public CompletableFuture<String> getLocks() {
2287    return this
2288        .<String> newMasterCaller()
2289        .action(
2290          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2291            controller, stub, GetLocksRequest.newBuilder().build(),
2292            (s, c, req, done) -> s.getLocks(c, req, done),
2293            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2294  }
2295
2296  @Override
2297  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2298    return this.<Void> newMasterCaller()
2299        .action((controller, stub) -> this
2300          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2301            controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2302            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2303        .call();
2304  }
2305
2306  @Override
2307  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2308    return this.<List<ServerName>> newMasterCaller()
2309        .action((controller, stub) -> this
2310          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2311            List<ServerName>> call(
2312              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2313              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2314              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2315                  .collect(Collectors.toList())))
2316        .call();
2317  }
2318
2319  @Override
2320  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2321      List<byte[]> encodedRegionNames) {
2322    return this.<Void> newMasterCaller()
2323        .action((controller, stub) -> this
2324          .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2325            stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2326            (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2327        .call();
2328  }
2329
2330  /**
2331   * Get the region location for the passed region name. The region name may be a full region name
2332   * or encoded region name. If the region does not found, then it'll throw an
2333   * UnknownRegionException wrapped by a {@link CompletableFuture}
2334   * @param regionNameOrEncodedRegionName region name or encoded region name
2335   * @return region location, wrapped by a {@link CompletableFuture}
2336   */
2337  @VisibleForTesting
2338  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2339    if (regionNameOrEncodedRegionName == null) {
2340      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2341    }
2342    try {
2343      CompletableFuture<Optional<HRegionLocation>> future;
2344      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2345        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2346        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2347          // old format encodedName, should be meta region
2348          future = connection.registry.getMetaRegionLocation()
2349            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2350              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2351        } else {
2352          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2353            regionNameOrEncodedRegionName);
2354        }
2355      } else {
2356        RegionInfo regionInfo =
2357          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2358        if (regionInfo.isMetaRegion()) {
2359          future = connection.registry.getMetaRegionLocation()
2360            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2361              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2362              .findFirst());
2363        } else {
2364          future =
2365            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2366        }
2367      }
2368
2369      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2370      addListener(future, (location, err) -> {
2371        if (err != null) {
2372          returnedFuture.completeExceptionally(err);
2373          return;
2374        }
2375        if (!location.isPresent() || location.get().getRegion() == null) {
2376          returnedFuture.completeExceptionally(
2377            new UnknownRegionException("Invalid region name or encoded region name: " +
2378              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2379        } else {
2380          returnedFuture.complete(location.get());
2381        }
2382      });
2383      return returnedFuture;
2384    } catch (IOException e) {
2385      return failedFuture(e);
2386    }
2387  }
2388
2389  /**
2390   * Get the region info for the passed region name. The region name may be a full region name or
2391   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2392   * wrapped by a {@link CompletableFuture}
2393   * @param regionNameOrEncodedRegionName
2394   * @return region info, wrapped by a {@link CompletableFuture}
2395   */
2396  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2397    if (regionNameOrEncodedRegionName == null) {
2398      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2399    }
2400
2401    if (Bytes.equals(regionNameOrEncodedRegionName,
2402      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2403      Bytes.equals(regionNameOrEncodedRegionName,
2404        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2405      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2406    }
2407
2408    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2409    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2410      if (err != null) {
2411        future.completeExceptionally(err);
2412      } else {
2413        future.complete(location.getRegion());
2414      }
2415    });
2416    return future;
2417  }
2418
2419  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2420    if (numRegions < 3) {
2421      throw new IllegalArgumentException("Must create at least three regions");
2422    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2423      throw new IllegalArgumentException("Start key must be smaller than end key");
2424    }
2425    if (numRegions == 3) {
2426      return new byte[][] { startKey, endKey };
2427    }
2428    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2429    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2430      throw new IllegalArgumentException("Unable to split key range into enough regions");
2431    }
2432    return splitKeys;
2433  }
2434
2435  private void verifySplitKeys(byte[][] splitKeys) {
2436    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2437    // Verify there are no duplicate split keys
2438    byte[] lastKey = null;
2439    for (byte[] splitKey : splitKeys) {
2440      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2441        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2442      }
2443      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2444        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2445            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2446      }
2447      lastKey = splitKey;
2448    }
2449  }
2450
2451  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2452
2453    abstract void onFinished();
2454
2455    abstract void onError(Throwable error);
2456
2457    @Override
2458    public void accept(Void v, Throwable error) {
2459      if (error != null) {
2460        onError(error);
2461        return;
2462      }
2463      onFinished();
2464    }
2465  }
2466
2467  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2468    protected final TableName tableName;
2469
2470    TableProcedureBiConsumer(TableName tableName) {
2471      this.tableName = tableName;
2472    }
2473
2474    abstract String getOperationType();
2475
2476    String getDescription() {
2477      return "Operation: " + getOperationType() + ", " + "Table Name: "
2478          + tableName.getNameWithNamespaceInclAsString();
2479    }
2480
2481    @Override
2482    void onFinished() {
2483      LOG.info(getDescription() + " completed");
2484    }
2485
2486    @Override
2487    void onError(Throwable error) {
2488      LOG.info(getDescription() + " failed with " + error.getMessage());
2489    }
2490  }
2491
2492  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2493    protected final String namespaceName;
2494
2495    NamespaceProcedureBiConsumer(String namespaceName) {
2496      this.namespaceName = namespaceName;
2497    }
2498
2499    abstract String getOperationType();
2500
2501    String getDescription() {
2502      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2503    }
2504
2505    @Override
2506    void onFinished() {
2507      LOG.info(getDescription() + " completed");
2508    }
2509
2510    @Override
2511    void onError(Throwable error) {
2512      LOG.info(getDescription() + " failed with " + error.getMessage());
2513    }
2514  }
2515
2516  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2517
2518    CreateTableProcedureBiConsumer(TableName tableName) {
2519      super(tableName);
2520    }
2521
2522    @Override
2523    String getOperationType() {
2524      return "CREATE";
2525    }
2526  }
2527
2528  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2529
2530    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2531      super(tableName);
2532    }
2533
2534    @Override
2535    String getOperationType() {
2536      return "ENABLE";
2537    }
2538  }
2539
2540  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2541
2542    DeleteTableProcedureBiConsumer(TableName tableName) {
2543      super(tableName);
2544    }
2545
2546    @Override
2547    String getOperationType() {
2548      return "DELETE";
2549    }
2550
2551    @Override
2552    void onFinished() {
2553      connection.getLocator().clearCache(this.tableName);
2554      super.onFinished();
2555    }
2556  }
2557
2558  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2559
2560    TruncateTableProcedureBiConsumer(TableName tableName) {
2561      super(tableName);
2562    }
2563
2564    @Override
2565    String getOperationType() {
2566      return "TRUNCATE";
2567    }
2568  }
2569
2570  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2571
2572    EnableTableProcedureBiConsumer(TableName tableName) {
2573      super(tableName);
2574    }
2575
2576    @Override
2577    String getOperationType() {
2578      return "ENABLE";
2579    }
2580  }
2581
2582  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2583
2584    DisableTableProcedureBiConsumer(TableName tableName) {
2585      super(tableName);
2586    }
2587
2588    @Override
2589    String getOperationType() {
2590      return "DISABLE";
2591    }
2592  }
2593
2594  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2595
2596    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2597      super(tableName);
2598    }
2599
2600    @Override
2601    String getOperationType() {
2602      return "ADD_COLUMN_FAMILY";
2603    }
2604  }
2605
2606  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2607
2608    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2609      super(tableName);
2610    }
2611
2612    @Override
2613    String getOperationType() {
2614      return "DELETE_COLUMN_FAMILY";
2615    }
2616  }
2617
2618  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2619
2620    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2621      super(tableName);
2622    }
2623
2624    @Override
2625    String getOperationType() {
2626      return "MODIFY_COLUMN_FAMILY";
2627    }
2628  }
2629
2630  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2631
2632    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2633      super(namespaceName);
2634    }
2635
2636    @Override
2637    String getOperationType() {
2638      return "CREATE_NAMESPACE";
2639    }
2640  }
2641
2642  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2643
2644    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2645      super(namespaceName);
2646    }
2647
2648    @Override
2649    String getOperationType() {
2650      return "DELETE_NAMESPACE";
2651    }
2652  }
2653
2654  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2655
2656    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2657      super(namespaceName);
2658    }
2659
2660    @Override
2661    String getOperationType() {
2662      return "MODIFY_NAMESPACE";
2663    }
2664  }
2665
2666  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2667
2668    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2669      super(tableName);
2670    }
2671
2672    @Override
2673    String getOperationType() {
2674      return "MERGE_REGIONS";
2675    }
2676  }
2677
2678  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2679
2680    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2681      super(tableName);
2682    }
2683
2684    @Override
2685    String getOperationType() {
2686      return "SPLIT_REGION";
2687    }
2688  }
2689
2690  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2691    private final String peerId;
2692    private final Supplier<String> getOperation;
2693
2694    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2695      this.peerId = peerId;
2696      this.getOperation = getOperation;
2697    }
2698
2699    String getDescription() {
2700      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2701    }
2702
2703    @Override
2704    void onFinished() {
2705      LOG.info(getDescription() + " completed");
2706    }
2707
2708    @Override
2709    void onError(Throwable error) {
2710      LOG.info(getDescription() + " failed with " + error.getMessage());
2711    }
2712  }
2713
2714  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2715    CompletableFuture<Void> future = new CompletableFuture<>();
2716    addListener(procFuture, (procId, error) -> {
2717      if (error != null) {
2718        future.completeExceptionally(error);
2719        return;
2720      }
2721      getProcedureResult(procId, future, 0);
2722    });
2723    return future;
2724  }
2725
2726  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2727    addListener(
2728      this.<GetProcedureResultResponse> newMasterCaller()
2729        .action((controller, stub) -> this
2730          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2731            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2732            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2733        .call(),
2734      (response, error) -> {
2735        if (error != null) {
2736          LOG.warn("failed to get the procedure result procId={}", procId,
2737            ConnectionUtils.translateException(error));
2738          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2739            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2740          return;
2741        }
2742        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2743          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2744            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2745          return;
2746        }
2747        if (response.hasException()) {
2748          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2749          future.completeExceptionally(ioe);
2750        } else {
2751          future.complete(null);
2752        }
2753      });
2754  }
2755
2756  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2757    CompletableFuture<T> future = new CompletableFuture<>();
2758    future.completeExceptionally(error);
2759    return future;
2760  }
2761
2762  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2763    if (error != null) {
2764      future.completeExceptionally(error);
2765      return true;
2766    }
2767    return false;
2768  }
2769
2770  @Override
2771  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2772    return getClusterMetrics(EnumSet.allOf(Option.class));
2773  }
2774
2775  @Override
2776  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2777    return this
2778        .<ClusterMetrics> newMasterCaller()
2779        .action(
2780          (controller, stub) -> this
2781              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2782                stub, RequestConverter.buildGetClusterStatusRequest(options),
2783                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2784                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2785  }
2786
2787  @Override
2788  public CompletableFuture<Void> shutdown() {
2789    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2790      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2791        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2792        resp -> null))
2793      .call();
2794  }
2795
2796  @Override
2797  public CompletableFuture<Void> stopMaster() {
2798    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2799      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2800        controller, stub, StopMasterRequest.newBuilder().build(),
2801        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2802      .call();
2803  }
2804
2805  @Override
2806  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2807    StopServerRequest request = RequestConverter
2808      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2809    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2810      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2811        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2812        resp -> null))
2813      .serverName(serverName).call();
2814  }
2815
2816  @Override
2817  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2818    return this
2819        .<Void> newAdminCaller()
2820        .action(
2821          (controller, stub) -> this
2822              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2823                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2824                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2825        .serverName(serverName).call();
2826  }
2827
2828  @Override
2829  public CompletableFuture<Void> updateConfiguration() {
2830    CompletableFuture<Void> future = new CompletableFuture<Void>();
2831    addListener(
2832      getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
2833      (status, err) -> {
2834        if (err != null) {
2835          future.completeExceptionally(err);
2836        } else {
2837          List<CompletableFuture<Void>> futures = new ArrayList<>();
2838          status.getLiveServerMetrics().keySet()
2839            .forEach(server -> futures.add(updateConfiguration(server)));
2840          futures.add(updateConfiguration(status.getMasterName()));
2841          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2842          addListener(
2843            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2844            (result, err2) -> {
2845              if (err2 != null) {
2846                future.completeExceptionally(err2);
2847              } else {
2848                future.complete(result);
2849              }
2850            });
2851        }
2852      });
2853    return future;
2854  }
2855
2856  @Override
2857  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2858    return this
2859        .<Void> newAdminCaller()
2860        .action(
2861          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2862            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2863            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2864        .serverName(serverName).call();
2865  }
2866
2867  @Override
2868  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2869    return this
2870        .<Void> newAdminCaller()
2871        .action(
2872          (controller, stub) -> this
2873              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2874                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2875                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2876        .serverName(serverName).call();
2877  }
2878
2879  @Override
2880  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2881    return this
2882        .<List<SecurityCapability>> newMasterCaller()
2883        .action(
2884          (controller, stub) -> this
2885              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2886                controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2887                    done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2888                    .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2889  }
2890
2891  @Override
2892  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2893    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2894  }
2895
2896  @Override
2897  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2898      TableName tableName) {
2899    Preconditions.checkNotNull(tableName,
2900      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2901    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2902  }
2903
2904  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2905      ServerName serverName) {
2906    return this.<List<RegionMetrics>> newAdminCaller()
2907        .action((controller, stub) -> this
2908            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2909              adminCall(controller, stub, request, (s, c, req, done) ->
2910                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2911        .serverName(serverName).call();
2912  }
2913
2914  @Override
2915  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2916    return this
2917        .<Boolean> newMasterCaller()
2918        .action(
2919          (controller, stub) -> this
2920              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2921                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2922                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2923                resp -> resp.getInMaintenanceMode())).call();
2924  }
2925
2926  @Override
2927  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2928      CompactType compactType) {
2929    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2930
2931    switch (compactType) {
2932      case MOB:
2933        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2934          if (err != null) {
2935            future.completeExceptionally(err);
2936            return;
2937          }
2938          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2939
2940          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2941            .action((controller, stub) -> this
2942              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2943                controller, stub,
2944                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2945                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2946            .call(), (resp2, err2) -> {
2947              if (err2 != null) {
2948                future.completeExceptionally(err2);
2949              } else {
2950                if (resp2.hasCompactionState()) {
2951                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2952                } else {
2953                  future.complete(CompactionState.NONE);
2954                }
2955              }
2956            });
2957        });
2958        break;
2959      case NORMAL:
2960        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2961          if (err != null) {
2962            future.completeExceptionally(err);
2963            return;
2964          }
2965          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2966          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2967          locations.stream().filter(loc -> loc.getServerName() != null)
2968            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2969            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2970              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2971                // If any region compaction state is MAJOR_AND_MINOR
2972                // the table compaction state is MAJOR_AND_MINOR, too.
2973                if (err2 != null) {
2974                  future.completeExceptionally(unwrapCompletionException(err2));
2975                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2976                  future.complete(regionState);
2977                } else {
2978                  regionStates.add(regionState);
2979                }
2980              }));
2981            });
2982          addListener(
2983            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2984            (ret, err3) -> {
2985              // If future not completed, check all regions's compaction state
2986              if (!future.isCompletedExceptionally() && !future.isDone()) {
2987                CompactionState state = CompactionState.NONE;
2988                for (CompactionState regionState : regionStates) {
2989                  switch (regionState) {
2990                    case MAJOR:
2991                      if (state == CompactionState.MINOR) {
2992                        future.complete(CompactionState.MAJOR_AND_MINOR);
2993                      } else {
2994                        state = CompactionState.MAJOR;
2995                      }
2996                      break;
2997                    case MINOR:
2998                      if (state == CompactionState.MAJOR) {
2999                        future.complete(CompactionState.MAJOR_AND_MINOR);
3000                      } else {
3001                        state = CompactionState.MINOR;
3002                      }
3003                      break;
3004                    case NONE:
3005                    default:
3006                  }
3007                }
3008                if (!future.isDone()) {
3009                  future.complete(state);
3010                }
3011              }
3012            });
3013        });
3014        break;
3015      default:
3016        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3017    }
3018
3019    return future;
3020  }
3021
3022  @Override
3023  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3024    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3025    addListener(getRegionLocation(regionName), (location, err) -> {
3026      if (err != null) {
3027        future.completeExceptionally(err);
3028        return;
3029      }
3030      ServerName serverName = location.getServerName();
3031      if (serverName == null) {
3032        future
3033          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3034        return;
3035      }
3036      addListener(
3037        this.<GetRegionInfoResponse> newAdminCaller()
3038          .action((controller, stub) -> this
3039            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3040              controller, stub,
3041              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3042                true),
3043              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3044          .serverName(serverName).call(),
3045        (resp2, err2) -> {
3046          if (err2 != null) {
3047            future.completeExceptionally(err2);
3048          } else {
3049            if (resp2.hasCompactionState()) {
3050              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3051            } else {
3052              future.complete(CompactionState.NONE);
3053            }
3054          }
3055        });
3056    });
3057    return future;
3058  }
3059
3060  @Override
3061  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3062    MajorCompactionTimestampRequest request =
3063        MajorCompactionTimestampRequest.newBuilder()
3064            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3065    return this
3066        .<Optional<Long>> newMasterCaller()
3067        .action(
3068          (controller, stub) -> this
3069              .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
3070                controller, stub, request,
3071                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3072                ProtobufUtil::toOptionalTimestamp)).call();
3073  }
3074
3075  @Override
3076  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3077      byte[] regionName) {
3078    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3079    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3080    addListener(getRegionInfo(regionName), (region, err) -> {
3081      if (err != null) {
3082        future.completeExceptionally(err);
3083        return;
3084      }
3085      MajorCompactionTimestampForRegionRequest.Builder builder =
3086        MajorCompactionTimestampForRegionRequest.newBuilder();
3087      builder.setRegion(
3088        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3089      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3090        .<MajorCompactionTimestampForRegionRequest,
3091        MajorCompactionTimestampResponse, Optional<Long>> call(
3092          controller, stub, builder.build(),
3093          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3094          ProtobufUtil::toOptionalTimestamp))
3095        .call(), (timestamp, err2) -> {
3096          if (err2 != null) {
3097            future.completeExceptionally(err2);
3098          } else {
3099            future.complete(timestamp);
3100          }
3101        });
3102    });
3103    return future;
3104  }
3105
3106  @Override
3107  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3108      List<String> serverNamesList) {
3109    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3110    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3111      if (err != null) {
3112        future.completeExceptionally(err);
3113        return;
3114      }
3115      // Accessed by multiple threads.
3116      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3117      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3118      serverNames.stream().forEach(serverName -> {
3119        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3120          if (err2 != null) {
3121            future.completeExceptionally(unwrapCompletionException(err2));
3122          } else {
3123            serverStates.put(serverName, serverState);
3124          }
3125        }));
3126      });
3127      addListener(
3128        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3129        (ret, err3) -> {
3130          if (!future.isCompletedExceptionally()) {
3131            if (err3 != null) {
3132              future.completeExceptionally(err3);
3133            } else {
3134              future.complete(serverStates);
3135            }
3136          }
3137        });
3138    });
3139    return future;
3140  }
3141
3142  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3143    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3144    if (serverNamesList.isEmpty()) {
3145      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3146        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
3147      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3148        if (err != null) {
3149          future.completeExceptionally(err);
3150        } else {
3151          future.complete(new ArrayList<>(clusterMetrics.getLiveServerMetrics().keySet()));
3152        }
3153      });
3154      return future;
3155    } else {
3156      List<ServerName> serverList = new ArrayList<>();
3157      for (String regionServerName : serverNamesList) {
3158        ServerName serverName = null;
3159        try {
3160          serverName = ServerName.valueOf(regionServerName);
3161        } catch (Exception e) {
3162          future.completeExceptionally(
3163            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3164        }
3165        if (serverName == null) {
3166          future.completeExceptionally(
3167            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3168        }
3169      }
3170      future.complete(serverList);
3171    }
3172    return future;
3173  }
3174
3175  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3176    return this
3177        .<Boolean>newAdminCaller()
3178        .serverName(serverName)
3179        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3180            Boolean>adminCall(controller, stub,
3181            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3182            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3183  }
3184
3185  @Override
3186  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3187    return this.<Boolean> newMasterCaller()
3188      .action((controller, stub) -> this
3189        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3190          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3191          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3192          (resp) -> resp.getPrevBalanceValue()))
3193      .call();
3194  }
3195
3196  @Override
3197  public CompletableFuture<Boolean> balance(boolean forcible) {
3198    return this
3199        .<Boolean> newMasterCaller()
3200        .action(
3201          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3202            stub, RequestConverter.buildBalanceRequest(forcible),
3203            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3204  }
3205
3206  @Override
3207  public CompletableFuture<Boolean> isBalancerEnabled() {
3208    return this
3209        .<Boolean> newMasterCaller()
3210        .action(
3211          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3212            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3213            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3214        .call();
3215  }
3216
3217  @Override
3218  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3219    return this
3220        .<Boolean> newMasterCaller()
3221        .action(
3222          (controller, stub) -> this
3223              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3224                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3225                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3226                    .getPrevNormalizerValue())).call();
3227  }
3228
3229  @Override
3230  public CompletableFuture<Boolean> isNormalizerEnabled() {
3231    return this
3232        .<Boolean> newMasterCaller()
3233        .action(
3234          (controller, stub) -> this
3235              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3236                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3237                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3238                (resp) -> resp.getEnabled())).call();
3239  }
3240
3241  @Override
3242  public CompletableFuture<Boolean> normalize() {
3243    return this
3244        .<Boolean> newMasterCaller()
3245        .action(
3246          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3247            controller, stub, RequestConverter.buildNormalizeRequest(),
3248            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3249        .call();
3250  }
3251
3252  @Override
3253  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3254    return this
3255        .<Boolean> newMasterCaller()
3256        .action(
3257          (controller, stub) -> this
3258              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3259                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3260                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3261                    .getPrevValue())).call();
3262  }
3263
3264  @Override
3265  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3266    return this
3267        .<Boolean> newMasterCaller()
3268        .action(
3269          (controller, stub) -> this
3270              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3271                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3272                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3273        .call();
3274  }
3275
3276  @Override
3277  public CompletableFuture<Boolean> runCleanerChore() {
3278    return this
3279        .<Boolean> newMasterCaller()
3280        .action(
3281          (controller, stub) -> this
3282              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3283                RequestConverter.buildRunCleanerChoreRequest(),
3284                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3285                (resp) -> resp.getCleanerChoreRan())).call();
3286  }
3287
3288  @Override
3289  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3290    return this
3291        .<Boolean> newMasterCaller()
3292        .action(
3293          (controller, stub) -> this
3294              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3295                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3296                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3297                    .getPrevValue())).call();
3298  }
3299
3300  @Override
3301  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3302    return this
3303        .<Boolean> newMasterCaller()
3304        .action(
3305          (controller, stub) -> this
3306              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3307                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3308                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3309                    .getValue())).call();
3310  }
3311
3312  @Override
3313  public CompletableFuture<Integer> runCatalogJanitor() {
3314    return this
3315        .<Integer> newMasterCaller()
3316        .action(
3317          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3318            controller, stub, RequestConverter.buildCatalogScanRequest(),
3319            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3320        .call();
3321  }
3322
3323  @Override
3324  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3325      ServiceCaller<S, R> callable) {
3326    MasterCoprocessorRpcChannelImpl channel =
3327        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3328    S stub = stubMaker.apply(channel);
3329    CompletableFuture<R> future = new CompletableFuture<>();
3330    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3331    callable.call(stub, controller, resp -> {
3332      if (controller.failed()) {
3333        future.completeExceptionally(controller.getFailed());
3334      } else {
3335        future.complete(resp);
3336      }
3337    });
3338    return future;
3339  }
3340
3341  @Override
3342  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3343      ServiceCaller<S, R> callable, ServerName serverName) {
3344    RegionServerCoprocessorRpcChannelImpl channel =
3345        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3346          serverName));
3347    S stub = stubMaker.apply(channel);
3348    CompletableFuture<R> future = new CompletableFuture<>();
3349    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3350    callable.call(stub, controller, resp -> {
3351      if (controller.failed()) {
3352        future.completeExceptionally(controller.getFailed());
3353      } else {
3354        future.complete(resp);
3355      }
3356    });
3357    return future;
3358  }
3359
3360  @Override
3361  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3362    return this.<List<ServerName>> newMasterCaller()
3363      .action((controller, stub) -> this
3364        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3365          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3366          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3367          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3368      .call();
3369  }
3370
3371  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3372    return this.connection.callerFactory.<T> serverRequest()
3373      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3374      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3375      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3376      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3377  }
3378
3379  @Override
3380  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3381    if (tableName == null) {
3382      return failedFuture(new IllegalArgumentException("Table name is null"));
3383    }
3384    CompletableFuture<Void> future = new CompletableFuture<>();
3385    addListener(tableExists(tableName), (exist, err) -> {
3386      if (err != null) {
3387        future.completeExceptionally(err);
3388        return;
3389      }
3390      if (!exist) {
3391        future.completeExceptionally(new TableNotFoundException(
3392          "Table '" + tableName.getNameAsString() + "' does not exists."));
3393        return;
3394      }
3395      addListener(getTableSplits(tableName), (splits, err1) -> {
3396        if (err1 != null) {
3397          future.completeExceptionally(err1);
3398        } else {
3399          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3400            if (err2 != null) {
3401              future.completeExceptionally(err2);
3402            } else {
3403              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3404                if (err3 != null) {
3405                  future.completeExceptionally(err3);
3406                } else {
3407                  future.complete(result3);
3408                }
3409              });
3410            }
3411          });
3412        }
3413      });
3414    });
3415    return future;
3416  }
3417
3418  @Override
3419  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3420    if (tableName == null) {
3421      return failedFuture(new IllegalArgumentException("Table name is null"));
3422    }
3423    CompletableFuture<Void> future = new CompletableFuture<>();
3424    addListener(tableExists(tableName), (exist, err) -> {
3425      if (err != null) {
3426        future.completeExceptionally(err);
3427        return;
3428      }
3429      if (!exist) {
3430        future.completeExceptionally(new TableNotFoundException(
3431          "Table '" + tableName.getNameAsString() + "' does not exists."));
3432        return;
3433      }
3434      addListener(setTableReplication(tableName, false), (result, err2) -> {
3435        if (err2 != null) {
3436          future.completeExceptionally(err2);
3437        } else {
3438          future.complete(result);
3439        }
3440      });
3441    });
3442    return future;
3443  }
3444
3445  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3446    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3447    addListener(
3448      getRegions(tableName).thenApply(regions -> regions.stream()
3449        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3450      (regions, err2) -> {
3451        if (err2 != null) {
3452          future.completeExceptionally(err2);
3453          return;
3454        }
3455        if (regions.size() == 1) {
3456          future.complete(null);
3457        } else {
3458          byte[][] splits = new byte[regions.size() - 1][];
3459          for (int i = 1; i < regions.size(); i++) {
3460            splits[i - 1] = regions.get(i).getStartKey();
3461          }
3462          future.complete(splits);
3463        }
3464      });
3465    return future;
3466  }
3467
3468  /**
3469   * Connect to peer and check the table descriptor on peer:
3470   * <ol>
3471   * <li>Create the same table on peer when not exist.</li>
3472   * <li>Throw an exception if the table already has replication enabled on any of the column
3473   * families.</li>
3474   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3475   * </ol>
3476   * @param tableName name of the table to sync to the peer
3477   * @param splits table split keys
3478   */
3479  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3480      byte[][] splits) {
3481    CompletableFuture<Void> future = new CompletableFuture<>();
3482    addListener(listReplicationPeers(), (peers, err) -> {
3483      if (err != null) {
3484        future.completeExceptionally(err);
3485        return;
3486      }
3487      if (peers == null || peers.size() <= 0) {
3488        future.completeExceptionally(
3489          new IllegalArgumentException("Found no peer cluster for replication."));
3490        return;
3491      }
3492      List<CompletableFuture<Void>> futures = new ArrayList<>();
3493      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3494        .forEach(peer -> {
3495          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3496        });
3497      addListener(
3498        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3499        (result, err2) -> {
3500          if (err2 != null) {
3501            future.completeExceptionally(err2);
3502          } else {
3503            future.complete(result);
3504          }
3505        });
3506    });
3507    return future;
3508  }
3509
3510  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3511      ReplicationPeerDescription peer) {
3512    Configuration peerConf = null;
3513    try {
3514      peerConf =
3515        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3516    } catch (IOException e) {
3517      return failedFuture(e);
3518    }
3519    CompletableFuture<Void> future = new CompletableFuture<>();
3520    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3521      if (err != null) {
3522        future.completeExceptionally(err);
3523        return;
3524      }
3525      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3526        if (err1 != null) {
3527          future.completeExceptionally(err1);
3528          return;
3529        }
3530        AsyncAdmin peerAdmin = conn.getAdmin();
3531        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3532          if (err2 != null) {
3533            future.completeExceptionally(err2);
3534            return;
3535          }
3536          if (!exist) {
3537            CompletableFuture<Void> createTableFuture = null;
3538            if (splits == null) {
3539              createTableFuture = peerAdmin.createTable(tableDesc);
3540            } else {
3541              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3542            }
3543            addListener(createTableFuture, (result, err3) -> {
3544              if (err3 != null) {
3545                future.completeExceptionally(err3);
3546              } else {
3547                future.complete(result);
3548              }
3549            });
3550          } else {
3551            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3552              (result, err4) -> {
3553                if (err4 != null) {
3554                  future.completeExceptionally(err4);
3555                } else {
3556                  future.complete(result);
3557                }
3558              });
3559          }
3560        });
3561      });
3562    });
3563    return future;
3564  }
3565
3566  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3567      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3568    CompletableFuture<Void> future = new CompletableFuture<>();
3569    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3570      if (err != null) {
3571        future.completeExceptionally(err);
3572        return;
3573      }
3574      if (peerTableDesc == null) {
3575        future.completeExceptionally(
3576          new IllegalArgumentException("Failed to get table descriptor for table " +
3577            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3578        return;
3579      }
3580      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3581        future.completeExceptionally(new IllegalArgumentException(
3582          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3583            ", but the table descriptors are not same when compared with source cluster." +
3584            " Thus can not enable the table's replication switch."));
3585        return;
3586      }
3587      future.complete(null);
3588    });
3589    return future;
3590  }
3591
3592  /**
3593   * Set the table's replication switch if the table's replication switch is already not set.
3594   * @param tableName name of the table
3595   * @param enableRep is replication switch enable or disable
3596   */
3597  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3598    CompletableFuture<Void> future = new CompletableFuture<>();
3599    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3600      if (err != null) {
3601        future.completeExceptionally(err);
3602        return;
3603      }
3604      if (!tableDesc.matchReplicationScope(enableRep)) {
3605        int scope =
3606          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3607        TableDescriptor newTableDesc =
3608          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3609        addListener(modifyTable(newTableDesc), (result, err2) -> {
3610          if (err2 != null) {
3611            future.completeExceptionally(err2);
3612          } else {
3613            future.complete(result);
3614          }
3615        });
3616      } else {
3617        future.complete(null);
3618      }
3619    });
3620    return future;
3621  }
3622
3623  @Override
3624  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3625    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3626    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3627      if (err != null) {
3628        future.completeExceptionally(err);
3629        return;
3630      }
3631      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3632        locations.stream().filter(l -> l.getRegion() != null)
3633          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3634          .collect(Collectors.groupingBy(l -> l.getServerName(),
3635            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3636      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3637      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3638      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3639        futures
3640          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3641            if (err2 != null) {
3642              future.completeExceptionally(unwrapCompletionException(err2));
3643            } else {
3644              aggregator.append(stats);
3645            }
3646          }));
3647      }
3648      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3649        (ret, err3) -> {
3650          if (err3 != null) {
3651            future.completeExceptionally(unwrapCompletionException(err3));
3652          } else {
3653            future.complete(aggregator.sum());
3654          }
3655        });
3656    });
3657    return future;
3658  }
3659
3660  @Override
3661  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3662      boolean preserveSplits) {
3663    CompletableFuture<Void> future = new CompletableFuture<>();
3664    addListener(tableExists(tableName), (exist, err) -> {
3665      if (err != null) {
3666        future.completeExceptionally(err);
3667        return;
3668      }
3669      if (!exist) {
3670        future.completeExceptionally(new TableNotFoundException(tableName));
3671        return;
3672      }
3673      addListener(tableExists(newTableName), (exist1, err1) -> {
3674        if (err1 != null) {
3675          future.completeExceptionally(err1);
3676          return;
3677        }
3678        if (exist1) {
3679          future.completeExceptionally(new TableExistsException(newTableName));
3680          return;
3681        }
3682        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3683          if (err2 != null) {
3684            future.completeExceptionally(err2);
3685            return;
3686          }
3687          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3688          if (preserveSplits) {
3689            addListener(getTableSplits(tableName), (splits, err3) -> {
3690              if (err3 != null) {
3691                future.completeExceptionally(err3);
3692              } else {
3693                addListener(
3694                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3695                  (result, err4) -> {
3696                    if (err4 != null) {
3697                      future.completeExceptionally(err4);
3698                    } else {
3699                      future.complete(result);
3700                    }
3701                  });
3702              }
3703            });
3704          } else {
3705            addListener(createTable(newTableDesc), (result, err5) -> {
3706              if (err5 != null) {
3707                future.completeExceptionally(err5);
3708              } else {
3709                future.complete(result);
3710              }
3711            });
3712          }
3713        });
3714      });
3715    });
3716    return future;
3717  }
3718
3719  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3720      List<RegionInfo> hris) {
3721    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3722      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3723        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3724        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3725        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3726      .serverName(serverName).call();
3727  }
3728
3729  @Override
3730  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3731    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3732        .action((controller, stub) -> this
3733            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3734              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3735              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3736              resp -> resp.getPreviousRpcThrottleEnabled()))
3737        .call();
3738    return future;
3739  }
3740
3741  @Override
3742  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3743    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3744        .action((controller, stub) -> this
3745            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3746              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3747              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3748              resp -> resp.getRpcThrottleEnabled()))
3749        .call();
3750    return future;
3751  }
3752
3753  @Override
3754  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3755    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3756        .action((controller, stub) -> this
3757            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3758              controller, stub,
3759              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3760                  .build(),
3761              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3762              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3763        .call();
3764    return future;
3765  }
3766
3767  @Override
3768  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3769    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3770      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3771      Map<TableName, Long>> call(controller, stub,
3772        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3773        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3774        resp -> resp.getSizesList().stream().collect(Collectors
3775          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3776      .call();
3777  }
3778
3779  @Override
3780  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3781      ServerName serverName) {
3782    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3783      .action((controller, stub) -> this
3784        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3785        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3786          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3787          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3788          resp -> resp.getSnapshotsList().stream()
3789            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3790              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3791      .serverName(serverName).call();
3792  }
3793
3794  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3795      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3796    return this.<SpaceQuotaSnapshot> newMasterCaller()
3797      .action((controller, stub) -> this
3798        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3799          RequestConverter.buildGetQuotaStatesRequest(),
3800          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3801      .call();
3802  }
3803
3804  @Override
3805  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3806    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3807      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3808      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3809  }
3810
3811  @Override
3812  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3813    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3814    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3815      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3816      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3817  }
3818
3819  @Override
3820  public CompletableFuture<Void> grant(UserPermission userPermission,
3821      boolean mergeExistingPermissions) {
3822    return this.<Void> newMasterCaller()
3823        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3824          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3825          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3826        .call();
3827  }
3828
3829  @Override
3830  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3831    return this.<Void> newMasterCaller()
3832        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3833          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3834          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3835        .call();
3836  }
3837
3838  @Override
3839  public CompletableFuture<List<UserPermission>>
3840      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3841    return this.<List<UserPermission>> newMasterCaller().action((controller,
3842        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3843            List<UserPermission>> call(controller, stub,
3844              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3845              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3846              resp -> resp.getUserPermissionList().stream()
3847                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3848                .collect(Collectors.toList())))
3849        .call();
3850  }
3851
3852  @Override
3853  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3854      List<Permission> permissions) {
3855    return this.<List<Boolean>> newMasterCaller()
3856        .action((controller, stub) -> this
3857            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3858              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3859              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3860              resp -> resp.getHasUserPermissionList()))
3861        .call();
3862  }
3863
3864  @Override
3865  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3866      final boolean sync) {
3867    return this.<Boolean>newMasterCaller()
3868        .action((controller, stub) -> this
3869            .call(controller, stub,
3870                RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3871                MasterService.Interface::switchSnapshotCleanup,
3872                SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
3873        .call();
3874  }
3875
3876  @Override
3877  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3878    return this.<Boolean>newMasterCaller()
3879        .action((controller, stub) -> this
3880            .call(controller, stub,
3881                RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3882                MasterService.Interface::isSnapshotCleanupEnabled,
3883                IsSnapshotCleanupEnabledResponse::getEnabled))
3884        .call();
3885  }
3886
3887}