001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcChannel;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.replication.SyncReplicationState;
089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
090import org.apache.hadoop.hbase.security.access.Permission;
091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
092import org.apache.hadoop.hbase.security.access.UserPermission;
093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.util.Bytes;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
107import org.apache.hbase.thirdparty.io.netty.util.Timeout;
108import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
109
110import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
111import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
210    .IsSnapshotCleanupEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
262    .SetSnapshotCleanupResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
305
306/**
307 * The implementation of AsyncAdmin.
308 * <p>
309 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
310 * be finished inside the rpc framework thread, which means that the callbacks registered to the
311 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
312 * this class should not try to do time consuming tasks in the callbacks.
313 * @since 2.0.0
314 * @see AsyncHBaseAdmin
315 * @see AsyncConnection#getAdmin()
316 * @see AsyncConnection#getAdminBuilder()
317 */
318@InterfaceAudience.Private
319class RawAsyncHBaseAdmin implements AsyncAdmin {
320  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
321
322  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
323
324  private final AsyncConnectionImpl connection;
325
326  private final HashedWheelTimer retryTimer;
327
328  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
329
330  private final long rpcTimeoutNs;
331
332  private final long operationTimeoutNs;
333
334  private final long pauseNs;
335
336  private final long pauseForCQTBENs;
337
338  private final int maxAttempts;
339
340  private final int startLogErrorsCnt;
341
342  private final NonceGenerator ng;
343
344  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
345      AsyncAdminBuilderBase builder) {
346    this.connection = connection;
347    this.retryTimer = retryTimer;
348    this.metaTable = connection.getTable(META_TABLE_NAME);
349    this.rpcTimeoutNs = builder.rpcTimeoutNs;
350    this.operationTimeoutNs = builder.operationTimeoutNs;
351    this.pauseNs = builder.pauseNs;
352    if (builder.pauseForCQTBENs < builder.pauseNs) {
353      LOG.warn(
354        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
355          " the normal pause value {} ms, use the greater one instead",
356        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
357        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
358      this.pauseForCQTBENs = builder.pauseNs;
359    } else {
360      this.pauseForCQTBENs = builder.pauseForCQTBENs;
361    }
362    this.maxAttempts = builder.maxAttempts;
363    this.startLogErrorsCnt = builder.startLogErrorsCnt;
364    this.ng = connection.getNonceGenerator();
365  }
366
367  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
368    return this.connection.callerFactory.<T> masterRequest()
369      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
370      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
371      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
372      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
373  }
374
375  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
376    return this.connection.callerFactory.<T> adminRequest()
377      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
378      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
379      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
380      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
381  }
382
383  @FunctionalInterface
384  private interface MasterRpcCall<RESP, REQ> {
385    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
386        RpcCallback<RESP> done);
387  }
388
389  @FunctionalInterface
390  private interface AdminRpcCall<RESP, REQ> {
391    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
392        RpcCallback<RESP> done);
393  }
394
395  @FunctionalInterface
396  private interface Converter<D, S> {
397    D convert(S src) throws IOException;
398  }
399
400  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
401      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
402      Converter<RESP, PRESP> respConverter) {
403    CompletableFuture<RESP> future = new CompletableFuture<>();
404    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
405
406      @Override
407      public void run(PRESP resp) {
408        if (controller.failed()) {
409          future.completeExceptionally(controller.getFailed());
410        } else {
411          try {
412            future.complete(respConverter.convert(resp));
413          } catch (IOException e) {
414            future.completeExceptionally(e);
415          }
416        }
417      }
418    });
419    return future;
420  }
421
422  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
423      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
424      Converter<RESP, PRESP> respConverter) {
425    CompletableFuture<RESP> future = new CompletableFuture<>();
426    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
427
428      @Override
429      public void run(PRESP resp) {
430        if (controller.failed()) {
431          future.completeExceptionally(controller.getFailed());
432        } else {
433          try {
434            future.complete(respConverter.convert(resp));
435          } catch (IOException e) {
436            future.completeExceptionally(e);
437          }
438        }
439      }
440    });
441    return future;
442  }
443
444  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
445      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
446      ProcedureBiConsumer consumer) {
447    return procedureCall(b -> {
448    }, preq, rpcCall, respConverter, consumer);
449  }
450
451  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
452      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
453      ProcedureBiConsumer consumer) {
454    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
455  }
456
457  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
458      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
459      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
460      ProcedureBiConsumer consumer) {
461    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
462        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
463    prioritySetter.accept(builder);
464    CompletableFuture<Long> procFuture = builder.call();
465    CompletableFuture<Void> future = waitProcedureResult(procFuture);
466    addListener(future, consumer);
467    return future;
468  }
469
470  @FunctionalInterface
471  private interface TableOperator {
472    CompletableFuture<Void> operate(TableName table);
473  }
474
475  @Override
476  public CompletableFuture<Boolean> tableExists(TableName tableName) {
477    if (TableName.isMetaTableName(tableName)) {
478      return CompletableFuture.completedFuture(true);
479    }
480    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
481  }
482
483  @Override
484  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
485    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
486      includeSysTables));
487  }
488
489  /**
490   * {@link #listTableDescriptors(boolean)}
491   */
492  @Override
493  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
494      boolean includeSysTables) {
495    Preconditions.checkNotNull(pattern,
496      "pattern is null. If you don't specify a pattern, "
497          + "use listTableDescriptors(boolean) instead");
498    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
499      includeSysTables));
500  }
501
502  @Override
503  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
504    Preconditions.checkNotNull(tableNames,
505      "tableNames is null. If you don't specify tableNames, "
506          + "use listTableDescriptors(boolean) instead");
507    if (tableNames.isEmpty()) {
508      return CompletableFuture.completedFuture(Collections.emptyList());
509    }
510    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
511  }
512
513  private CompletableFuture<List<TableDescriptor>>
514      getTableDescriptors(GetTableDescriptorsRequest request) {
515    return this.<List<TableDescriptor>> newMasterCaller()
516        .action((controller, stub) -> this
517            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
518              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
519              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
520        .call();
521  }
522
523  @Override
524  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
525    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
526  }
527
528  @Override
529  public CompletableFuture<List<TableName>>
530      listTableNames(Pattern pattern, boolean includeSysTables) {
531    Preconditions.checkNotNull(pattern,
532        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
533    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
534  }
535
536  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
537    return this
538        .<List<TableName>> newMasterCaller()
539        .action(
540          (controller, stub) -> this
541              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
542                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
543                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
544  }
545
546  @Override
547  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
548    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
549        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
550        List<TableDescriptor>> call(
551          controller, stub,
552          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
553          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
554          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
555        .call();
556  }
557
558  @Override
559  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
560    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
561        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
562        List<TableName>> call(
563          controller, stub,
564          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
565          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
566          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
567        .call();
568  }
569
570  @Override
571  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
572    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
573    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
574      .action((controller, stub) -> this
575        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
576          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
577          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
578          (resp) -> resp.getTableSchemaList()))
579      .call(), (tableSchemas, error) -> {
580        if (error != null) {
581          future.completeExceptionally(error);
582          return;
583        }
584        if (!tableSchemas.isEmpty()) {
585          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
586        } else {
587          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
588        }
589      });
590    return future;
591  }
592
593  @Override
594  public CompletableFuture<Void> createTable(TableDescriptor desc) {
595    return createTable(desc.getTableName(),
596      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
597  }
598
599  @Override
600  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
601      int numRegions) {
602    try {
603      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
604    } catch (IllegalArgumentException e) {
605      return failedFuture(e);
606    }
607  }
608
609  @Override
610  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
611    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
612        + " use createTable(TableDescriptor) instead");
613    try {
614      verifySplitKeys(splitKeys);
615      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
616        splitKeys, ng.getNonceGroup(), ng.newNonce()));
617    } catch (IllegalArgumentException e) {
618      return failedFuture(e);
619    }
620  }
621
622  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
623    Preconditions.checkNotNull(tableName, "table name is null");
624    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
625      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
626      new CreateTableProcedureBiConsumer(tableName));
627  }
628
629  @Override
630  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
631    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
632      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
633        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
634      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
635  }
636
637  @Override
638  public CompletableFuture<Void> deleteTable(TableName tableName) {
639    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
640      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
641      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
642      new DeleteTableProcedureBiConsumer(tableName));
643  }
644
645  @Override
646  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
647    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
648      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
649        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
650      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
651  }
652
653  @Override
654  public CompletableFuture<Void> enableTable(TableName tableName) {
655    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
656      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
657      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
658      new EnableTableProcedureBiConsumer(tableName));
659  }
660
661  @Override
662  public CompletableFuture<Void> disableTable(TableName tableName) {
663    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
664      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
665      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
666      new DisableTableProcedureBiConsumer(tableName));
667  }
668
669  /**
670   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
671   * using passed parameters. Sets error or boolean result ('true' if table matches
672   * the passed-in targetState).
673   */
674  private static CompletableFuture<Boolean> completeCheckTableState(
675      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
676      TableState.State targetState, TableName tableName) {
677    if (error != null) {
678      future.completeExceptionally(error);
679    } else {
680      if (tableState != null) {
681        future.complete(tableState.inStates(targetState));
682      } else {
683        future.completeExceptionally(new TableNotFoundException(tableName));
684      }
685    }
686    return future;
687  }
688
689  @Override
690  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
691    if (TableName.isMetaTableName(tableName)) {
692      return CompletableFuture.completedFuture(true);
693    }
694    CompletableFuture<Boolean> future = new CompletableFuture<>();
695    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
696      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
697        TableState.State.ENABLED, tableName);
698    });
699    return future;
700  }
701
702  @Override
703  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
704    if (TableName.isMetaTableName(tableName)) {
705      return CompletableFuture.completedFuture(false);
706    }
707    CompletableFuture<Boolean> future = new CompletableFuture<>();
708    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
709      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
710        TableState.State.DISABLED, tableName);
711    });
712    return future;
713  }
714
715  @Override
716  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
717    if (TableName.isMetaTableName(tableName)) {
718      return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
719        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
720    }
721    CompletableFuture<Boolean> future = new CompletableFuture<>();
722    addListener(isTableEnabled(tableName), (enabled, error) -> {
723      if (error != null) {
724        if (error instanceof TableNotFoundException) {
725          future.complete(false);
726        } else {
727          future.completeExceptionally(error);
728        }
729        return;
730      }
731      if (!enabled) {
732        future.complete(false);
733      } else {
734        addListener(
735          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
736          (locations, error1) -> {
737            if (error1 != null) {
738              future.completeExceptionally(error1);
739              return;
740            }
741            List<HRegionLocation> notDeployedRegions = locations.stream()
742              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
743            if (notDeployedRegions.size() > 0) {
744              if (LOG.isDebugEnabled()) {
745                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
746              }
747              future.complete(false);
748              return;
749            }
750            future.complete(true);
751          });
752      }
753    });
754    return future;
755  }
756
757  @Override
758  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
759    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
760      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
761        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
762      new AddColumnFamilyProcedureBiConsumer(tableName));
763  }
764
765  @Override
766  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
767    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
768      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
769        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
770      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
771  }
772
773  @Override
774  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
775      ColumnFamilyDescriptor columnFamily) {
776    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
777      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
778        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
779      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
780  }
781
782  @Override
783  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
784    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
785      RequestConverter.buildCreateNamespaceRequest(descriptor),
786      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
787      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
788  }
789
790  @Override
791  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
792    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
793      RequestConverter.buildModifyNamespaceRequest(descriptor),
794      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
795      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
796  }
797
798  @Override
799  public CompletableFuture<Void> deleteNamespace(String name) {
800    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
801      RequestConverter.buildDeleteNamespaceRequest(name),
802      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
803      new DeleteNamespaceProcedureBiConsumer(name));
804  }
805
806  @Override
807  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
808    return this
809        .<NamespaceDescriptor> newMasterCaller()
810        .action(
811          (controller, stub) -> this
812              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
813                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
814                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
815                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
816  }
817
818  @Override
819  public CompletableFuture<List<String>> listNamespaces() {
820    return this
821        .<List<String>> newMasterCaller()
822        .action(
823          (controller, stub) -> this
824              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
825                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
826                  done) -> s.listNamespaces(c, req, done),
827                (resp) -> resp.getNamespaceNameList())).call();
828  }
829
830  @Override
831  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
832    return this
833        .<List<NamespaceDescriptor>> newMasterCaller()
834        .action(
835          (controller, stub) -> this
836              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
837                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
838                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
839                    .toNamespaceDescriptorList(resp))).call();
840  }
841
842  @Override
843  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
844    return this.<List<RegionInfo>> newAdminCaller()
845        .action((controller, stub) -> this
846            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
847              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
848              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
849              resp -> ProtobufUtil.getRegionInfos(resp)))
850        .serverName(serverName).call();
851  }
852
853  @Override
854  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
855    if (tableName.equals(META_TABLE_NAME)) {
856      return connection.registry.getMetaRegionLocation()
857        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
858          .collect(Collectors.toList()));
859    } else {
860      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
861        .thenApply(
862          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
863    }
864  }
865
866  @Override
867  public CompletableFuture<Void> flush(TableName tableName) {
868    CompletableFuture<Void> future = new CompletableFuture<>();
869    addListener(tableExists(tableName), (exists, err) -> {
870      if (err != null) {
871        future.completeExceptionally(err);
872      } else if (!exists) {
873        future.completeExceptionally(new TableNotFoundException(tableName));
874      } else {
875        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
876          if (err2 != null) {
877            future.completeExceptionally(err2);
878          } else if (!tableEnabled) {
879            future.completeExceptionally(new TableNotEnabledException(tableName));
880          } else {
881            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
882              new HashMap<>()), (ret, err3) -> {
883                if (err3 != null) {
884                  future.completeExceptionally(err3);
885                } else {
886                  future.complete(ret);
887                }
888              });
889          }
890        });
891      }
892    });
893    return future;
894  }
895
896  @Override
897  public CompletableFuture<Void> flushRegion(byte[] regionName) {
898    return flushRegionInternal(regionName, false).thenAccept(r -> {
899    });
900  }
901
902  /**
903   * This method is for internal use only, where we need the response of the flush.
904   * <p/>
905   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
906   * API.
907   */
908  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
909      boolean writeFlushWALMarker) {
910    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
911    addListener(getRegionLocation(regionName), (location, err) -> {
912      if (err != null) {
913        future.completeExceptionally(err);
914        return;
915      }
916      ServerName serverName = location.getServerName();
917      if (serverName == null) {
918        future
919          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
920        return;
921      }
922      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
923        if (err2 != null) {
924          future.completeExceptionally(err2);
925        } else {
926          future.complete(ret);
927        }
928      });
929    });
930    return future;
931  }
932
933  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
934      boolean writeFlushWALMarker) {
935    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
936      .action((controller, stub) -> this
937        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
938          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
939          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
940      .call();
941  }
942
943  @Override
944  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
945    CompletableFuture<Void> future = new CompletableFuture<>();
946    addListener(getRegions(sn), (hRegionInfos, err) -> {
947      if (err != null) {
948        future.completeExceptionally(err);
949        return;
950      }
951      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
952      if (hRegionInfos != null) {
953        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
954        })));
955      }
956      addListener(CompletableFuture.allOf(
957        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
958          if (err2 != null) {
959            future.completeExceptionally(err2);
960          } else {
961            future.complete(ret);
962          }
963        });
964    });
965    return future;
966  }
967
968  @Override
969  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
970    return compact(tableName, null, false, compactType);
971  }
972
973  @Override
974  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
975      CompactType compactType) {
976    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
977        + "If you don't specify a columnFamily, use compact(TableName) instead");
978    return compact(tableName, columnFamily, false, compactType);
979  }
980
981  @Override
982  public CompletableFuture<Void> compactRegion(byte[] regionName) {
983    return compactRegion(regionName, null, false);
984  }
985
986  @Override
987  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
988    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
989        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
990    return compactRegion(regionName, columnFamily, false);
991  }
992
993  @Override
994  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
995    return compact(tableName, null, true, compactType);
996  }
997
998  @Override
999  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1000      CompactType compactType) {
1001    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1002        + "If you don't specify a columnFamily, use compact(TableName) instead");
1003    return compact(tableName, columnFamily, true, compactType);
1004  }
1005
1006  @Override
1007  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1008    return compactRegion(regionName, null, true);
1009  }
1010
1011  @Override
1012  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1013    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1014        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1015    return compactRegion(regionName, columnFamily, true);
1016  }
1017
1018  @Override
1019  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1020    return compactRegionServer(sn, false);
1021  }
1022
1023  @Override
1024  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1025    return compactRegionServer(sn, true);
1026  }
1027
1028  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1029    CompletableFuture<Void> future = new CompletableFuture<>();
1030    addListener(getRegions(sn), (hRegionInfos, err) -> {
1031      if (err != null) {
1032        future.completeExceptionally(err);
1033        return;
1034      }
1035      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1036      if (hRegionInfos != null) {
1037        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1038      }
1039      addListener(CompletableFuture.allOf(
1040        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1041          if (err2 != null) {
1042            future.completeExceptionally(err2);
1043          } else {
1044            future.complete(ret);
1045          }
1046        });
1047    });
1048    return future;
1049  }
1050
1051  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1052      boolean major) {
1053    CompletableFuture<Void> future = new CompletableFuture<>();
1054    addListener(getRegionLocation(regionName), (location, err) -> {
1055      if (err != null) {
1056        future.completeExceptionally(err);
1057        return;
1058      }
1059      ServerName serverName = location.getServerName();
1060      if (serverName == null) {
1061        future
1062          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1063        return;
1064      }
1065      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1066        (ret, err2) -> {
1067          if (err2 != null) {
1068            future.completeExceptionally(err2);
1069          } else {
1070            future.complete(ret);
1071          }
1072        });
1073    });
1074    return future;
1075  }
1076
1077  /**
1078   * List all region locations for the specific table.
1079   */
1080  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1081    if (TableName.META_TABLE_NAME.equals(tableName)) {
1082      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1083      // For meta table, we use zk to fetch all locations.
1084      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1085      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1086        if (err != null) {
1087          future.completeExceptionally(err);
1088        } else if (metaRegions == null || metaRegions.isEmpty() ||
1089          metaRegions.getDefaultRegionLocation() == null) {
1090          future.completeExceptionally(new IOException("meta region does not found"));
1091        } else {
1092          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1093        }
1094        // close the registry.
1095        IOUtils.closeQuietly(registry);
1096      });
1097      return future;
1098    } else {
1099      // For non-meta table, we fetch all locations by scanning hbase:meta table
1100      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1101    }
1102  }
1103
1104  /**
1105   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1106   */
1107  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1108      CompactType compactType) {
1109    CompletableFuture<Void> future = new CompletableFuture<>();
1110
1111    switch (compactType) {
1112      case MOB:
1113        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1114          if (err != null) {
1115            future.completeExceptionally(err);
1116            return;
1117          }
1118          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1119          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1120            if (err2 != null) {
1121              future.completeExceptionally(err2);
1122            } else {
1123              future.complete(ret);
1124            }
1125          });
1126        });
1127        break;
1128      case NORMAL:
1129        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1130          if (err != null) {
1131            future.completeExceptionally(err);
1132            return;
1133          }
1134          if (locations == null || locations.isEmpty()) {
1135            future.completeExceptionally(new TableNotFoundException(tableName));
1136          }
1137          CompletableFuture<?>[] compactFutures =
1138            locations.stream().filter(l -> l.getRegion() != null)
1139              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1140              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1141              .toArray(CompletableFuture<?>[]::new);
1142          // future complete unless all of the compact futures are completed.
1143          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1144            if (err2 != null) {
1145              future.completeExceptionally(err2);
1146            } else {
1147              future.complete(ret);
1148            }
1149          });
1150        });
1151        break;
1152      default:
1153        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1154    }
1155    return future;
1156  }
1157
1158  /**
1159   * Compact the region at specific region server.
1160   */
1161  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1162      final boolean major, byte[] columnFamily) {
1163    return this
1164        .<Void> newAdminCaller()
1165        .serverName(sn)
1166        .action(
1167          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1168            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1169              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1170            resp -> null)).call();
1171  }
1172
1173  private byte[] toEncodeRegionName(byte[] regionName) {
1174    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1175      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1176  }
1177
1178  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1179      CompletableFuture<TableName> result) {
1180    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1181      if (err != null) {
1182        result.completeExceptionally(err);
1183        return;
1184      }
1185      RegionInfo regionInfo = location.getRegion();
1186      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1187        result.completeExceptionally(
1188          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1189        return;
1190      }
1191      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1192        if (!tableName.get().equals(regionInfo.getTable())) {
1193          // tables of this two region should be same.
1194          result.completeExceptionally(
1195            new IllegalArgumentException("Cannot merge regions from two different tables " +
1196              tableName.get() + " and " + regionInfo.getTable()));
1197        } else {
1198          result.complete(tableName.get());
1199        }
1200      }
1201    });
1202  }
1203
1204  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1205    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1206    CompletableFuture<TableName> future = new CompletableFuture<>();
1207    for (byte[] encodedRegionName : encodedRegionNames) {
1208      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1209    }
1210    return future;
1211  }
1212
1213  @Override
1214  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1215    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1216  }
1217
1218  @Override
1219  public CompletableFuture<Boolean> isMergeEnabled() {
1220    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1221  }
1222
1223  @Override
1224  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1225    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1226  }
1227
1228  @Override
1229  public CompletableFuture<Boolean> isSplitEnabled() {
1230    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1231  }
1232
1233  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1234      MasterSwitchType switchType) {
1235    SetSplitOrMergeEnabledRequest request =
1236      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1237    return this.<Boolean> newMasterCaller()
1238      .action((controller, stub) -> this
1239        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1240          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1241          (resp) -> resp.getPrevValueList().get(0)))
1242      .call();
1243  }
1244
1245  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1246    IsSplitOrMergeEnabledRequest request =
1247        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1248    return this
1249        .<Boolean> newMasterCaller()
1250        .action(
1251          (controller, stub) -> this
1252              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1253                controller, stub, request,
1254                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1255                (resp) -> resp.getEnabled())).call();
1256  }
1257
1258  @Override
1259  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1260    if (nameOfRegionsToMerge.size() < 2) {
1261      return failedFuture(new IllegalArgumentException(
1262        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1263    }
1264    CompletableFuture<Void> future = new CompletableFuture<>();
1265    byte[][] encodedNameOfRegionsToMerge =
1266      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1267
1268    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1269      if (err != null) {
1270        future.completeExceptionally(err);
1271        return;
1272      }
1273
1274      MergeTableRegionsRequest request = null;
1275      try {
1276        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1277          forcible, ng.getNonceGroup(), ng.newNonce());
1278      } catch (DeserializationException e) {
1279        future.completeExceptionally(e);
1280        return;
1281      }
1282
1283      addListener(
1284        this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
1285          (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1286          new MergeTableRegionProcedureBiConsumer(tableName)),
1287        (ret, err2) -> {
1288          if (err2 != null) {
1289            future.completeExceptionally(err2);
1290          } else {
1291            future.complete(ret);
1292          }
1293        });
1294    });
1295    return future;
1296  }
1297
1298  @Override
1299  public CompletableFuture<Void> split(TableName tableName) {
1300    CompletableFuture<Void> future = new CompletableFuture<>();
1301    addListener(tableExists(tableName), (exist, error) -> {
1302      if (error != null) {
1303        future.completeExceptionally(error);
1304        return;
1305      }
1306      if (!exist) {
1307        future.completeExceptionally(new TableNotFoundException(tableName));
1308        return;
1309      }
1310      addListener(
1311        metaTable
1312          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1313            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1314            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1315        (results, err2) -> {
1316          if (err2 != null) {
1317            future.completeExceptionally(err2);
1318            return;
1319          }
1320          if (results != null && !results.isEmpty()) {
1321            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1322            for (Result r : results) {
1323              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1324                continue;
1325              }
1326              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1327              if (rl != null) {
1328                for (HRegionLocation h : rl.getRegionLocations()) {
1329                  if (h != null && h.getServerName() != null) {
1330                    RegionInfo hri = h.getRegion();
1331                    if (hri == null || hri.isSplitParent() ||
1332                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1333                      continue;
1334                    }
1335                    splitFutures.add(split(hri, null));
1336                  }
1337                }
1338              }
1339            }
1340            addListener(
1341              CompletableFuture
1342                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1343              (ret, exception) -> {
1344                if (exception != null) {
1345                  future.completeExceptionally(exception);
1346                  return;
1347                }
1348                future.complete(ret);
1349              });
1350          } else {
1351            future.complete(null);
1352          }
1353        });
1354    });
1355    return future;
1356  }
1357
1358  @Override
1359  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1360    CompletableFuture<Void> result = new CompletableFuture<>();
1361    if (splitPoint == null) {
1362      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1363    }
1364    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1365      (loc, err) -> {
1366        if (err != null) {
1367          result.completeExceptionally(err);
1368        } else if (loc == null || loc.getRegion() == null) {
1369          result.completeExceptionally(new IllegalArgumentException(
1370            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1371        } else {
1372          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1373            if (err2 != null) {
1374              result.completeExceptionally(err2);
1375            } else {
1376              result.complete(ret);
1377            }
1378
1379          });
1380        }
1381      });
1382    return result;
1383  }
1384
1385  @Override
1386  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1387    CompletableFuture<Void> future = new CompletableFuture<>();
1388    addListener(getRegionLocation(regionName), (location, err) -> {
1389      if (err != null) {
1390        future.completeExceptionally(err);
1391        return;
1392      }
1393      RegionInfo regionInfo = location.getRegion();
1394      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1395        future
1396          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1397            "Replicas are auto-split when their primary is split."));
1398        return;
1399      }
1400      ServerName serverName = location.getServerName();
1401      if (serverName == null) {
1402        future
1403          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1404        return;
1405      }
1406      addListener(split(regionInfo, null), (ret, err2) -> {
1407        if (err2 != null) {
1408          future.completeExceptionally(err2);
1409        } else {
1410          future.complete(ret);
1411        }
1412      });
1413    });
1414    return future;
1415  }
1416
1417  @Override
1418  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1419    Preconditions.checkNotNull(splitPoint,
1420      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1421    CompletableFuture<Void> future = new CompletableFuture<>();
1422    addListener(getRegionLocation(regionName), (location, err) -> {
1423      if (err != null) {
1424        future.completeExceptionally(err);
1425        return;
1426      }
1427      RegionInfo regionInfo = location.getRegion();
1428      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1429        future
1430          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1431            "Replicas are auto-split when their primary is split."));
1432        return;
1433      }
1434      ServerName serverName = location.getServerName();
1435      if (serverName == null) {
1436        future
1437          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1438        return;
1439      }
1440      if (regionInfo.getStartKey() != null &&
1441        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1442        future.completeExceptionally(
1443          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1444        return;
1445      }
1446      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1447        if (err2 != null) {
1448          future.completeExceptionally(err2);
1449        } else {
1450          future.complete(ret);
1451        }
1452      });
1453    });
1454    return future;
1455  }
1456
1457  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1458    CompletableFuture<Void> future = new CompletableFuture<>();
1459    TableName tableName = hri.getTable();
1460    SplitTableRegionRequest request = null;
1461    try {
1462      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1463        ng.newNonce());
1464    } catch (DeserializationException e) {
1465      future.completeExceptionally(e);
1466      return future;
1467    }
1468
1469    addListener(
1470      this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
1471        request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1472        new SplitTableRegionProcedureBiConsumer(tableName)),
1473      (ret, err2) -> {
1474        if (err2 != null) {
1475          future.completeExceptionally(err2);
1476        } else {
1477          future.complete(ret);
1478        }
1479      });
1480    return future;
1481  }
1482
1483  @Override
1484  public CompletableFuture<Void> assign(byte[] regionName) {
1485    CompletableFuture<Void> future = new CompletableFuture<>();
1486    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1487      if (err != null) {
1488        future.completeExceptionally(err);
1489        return;
1490      }
1491      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1492        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1493          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1494          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1495        .call(), (ret, err2) -> {
1496          if (err2 != null) {
1497            future.completeExceptionally(err2);
1498          } else {
1499            future.complete(ret);
1500          }
1501        });
1502    });
1503    return future;
1504  }
1505
1506  @Override
1507  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1508    CompletableFuture<Void> future = new CompletableFuture<>();
1509    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1510      if (err != null) {
1511        future.completeExceptionally(err);
1512        return;
1513      }
1514      addListener(
1515        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1516          .action(((controller, stub) -> this
1517            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1518              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1519              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1520          .call(),
1521        (ret, err2) -> {
1522          if (err2 != null) {
1523            future.completeExceptionally(err2);
1524          } else {
1525            future.complete(ret);
1526          }
1527        });
1528    });
1529    return future;
1530  }
1531
1532  @Override
1533  public CompletableFuture<Void> offline(byte[] regionName) {
1534    CompletableFuture<Void> future = new CompletableFuture<>();
1535    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1536      if (err != null) {
1537        future.completeExceptionally(err);
1538        return;
1539      }
1540      addListener(
1541        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1542          .action(((controller, stub) -> this
1543            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1544              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1545              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1546          .call(),
1547        (ret, err2) -> {
1548          if (err2 != null) {
1549            future.completeExceptionally(err2);
1550          } else {
1551            future.complete(ret);
1552          }
1553        });
1554    });
1555    return future;
1556  }
1557
1558  @Override
1559  public CompletableFuture<Void> move(byte[] regionName) {
1560    CompletableFuture<Void> future = new CompletableFuture<>();
1561    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1562      if (err != null) {
1563        future.completeExceptionally(err);
1564        return;
1565      }
1566      addListener(
1567        moveRegion(regionInfo,
1568          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1569        (ret, err2) -> {
1570          if (err2 != null) {
1571            future.completeExceptionally(err2);
1572          } else {
1573            future.complete(ret);
1574          }
1575        });
1576    });
1577    return future;
1578  }
1579
1580  @Override
1581  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1582    Preconditions.checkNotNull(destServerName,
1583      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1584    CompletableFuture<Void> future = new CompletableFuture<>();
1585    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1586      if (err != null) {
1587        future.completeExceptionally(err);
1588        return;
1589      }
1590      addListener(
1591        moveRegion(regionInfo, RequestConverter
1592          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1593        (ret, err2) -> {
1594          if (err2 != null) {
1595            future.completeExceptionally(err2);
1596          } else {
1597            future.complete(ret);
1598          }
1599        });
1600    });
1601    return future;
1602  }
1603
1604  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1605    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1606      .action(
1607        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1608          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1609      .call();
1610  }
1611
1612  @Override
1613  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1614    return this
1615        .<Void> newMasterCaller()
1616        .action(
1617          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1618            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1619            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1620  }
1621
1622  @Override
1623  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1624    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1625    Scan scan = QuotaTableUtil.makeScan(filter);
1626    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1627        .scan(scan, new AdvancedScanResultConsumer() {
1628          List<QuotaSettings> settings = new ArrayList<>();
1629
1630          @Override
1631          public void onNext(Result[] results, ScanController controller) {
1632            for (Result result : results) {
1633              try {
1634                QuotaTableUtil.parseResultToCollection(result, settings);
1635              } catch (IOException e) {
1636                controller.terminate();
1637                future.completeExceptionally(e);
1638              }
1639            }
1640          }
1641
1642          @Override
1643          public void onError(Throwable error) {
1644            future.completeExceptionally(error);
1645          }
1646
1647          @Override
1648          public void onComplete() {
1649            future.complete(settings);
1650          }
1651        });
1652    return future;
1653  }
1654
1655  @Override
1656  public CompletableFuture<Void> addReplicationPeer(String peerId,
1657      ReplicationPeerConfig peerConfig, boolean enabled) {
1658    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1659      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1660      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1661      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1662  }
1663
1664  @Override
1665  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1666    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1667      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1668      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1669      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1670  }
1671
1672  @Override
1673  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1674    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1675      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1676      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1677      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1678  }
1679
1680  @Override
1681  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1682    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1683      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1684      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1685      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1686  }
1687
1688  @Override
1689  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1690    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1691      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1692        controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1693        (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1694        (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1695      .call();
1696  }
1697
1698  @Override
1699  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1700      ReplicationPeerConfig peerConfig) {
1701    return this
1702      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1703        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1704        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1705        (resp) -> resp.getProcId(),
1706        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1707  }
1708
1709  @Override
1710  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1711      SyncReplicationState clusterState) {
1712    return this
1713      .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1714        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1715          clusterState),
1716        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1717        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1718          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1719  }
1720
1721  @Override
1722  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1723      Map<TableName, List<String>> tableCfs) {
1724    if (tableCfs == null) {
1725      return failedFuture(new ReplicationException("tableCfs is null"));
1726    }
1727
1728    CompletableFuture<Void> future = new CompletableFuture<Void>();
1729    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1730      if (!completeExceptionally(future, error)) {
1731        ReplicationPeerConfig newPeerConfig =
1732          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1733        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1734          if (!completeExceptionally(future, error)) {
1735            future.complete(result);
1736          }
1737        });
1738      }
1739    });
1740    return future;
1741  }
1742
1743  @Override
1744  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1745      Map<TableName, List<String>> tableCfs) {
1746    if (tableCfs == null) {
1747      return failedFuture(new ReplicationException("tableCfs is null"));
1748    }
1749
1750    CompletableFuture<Void> future = new CompletableFuture<Void>();
1751    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1752      if (!completeExceptionally(future, error)) {
1753        ReplicationPeerConfig newPeerConfig = null;
1754        try {
1755          newPeerConfig = ReplicationPeerConfigUtil
1756            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1757        } catch (ReplicationException e) {
1758          future.completeExceptionally(e);
1759          return;
1760        }
1761        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1762          if (!completeExceptionally(future, error)) {
1763            future.complete(result);
1764          }
1765        });
1766      }
1767    });
1768    return future;
1769  }
1770
1771  @Override
1772  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1773    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1774  }
1775
1776  @Override
1777  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1778    Preconditions.checkNotNull(pattern,
1779      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1780    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1781  }
1782
1783  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1784      ListReplicationPeersRequest request) {
1785    return this
1786        .<List<ReplicationPeerDescription>> newMasterCaller()
1787        .action(
1788          (controller, stub) -> this
1789              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1790                controller,
1791                stub,
1792                request,
1793                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1794                (resp) -> resp.getPeerDescList().stream()
1795                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1796                    .collect(Collectors.toList()))).call();
1797  }
1798
1799  @Override
1800  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1801    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1802    addListener(listTableDescriptors(), (tables, error) -> {
1803      if (!completeExceptionally(future, error)) {
1804        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1805        tables.forEach(table -> {
1806          Map<String, Integer> cfs = new HashMap<>();
1807          Stream.of(table.getColumnFamilies())
1808            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1809            .forEach(column -> {
1810              cfs.put(column.getNameAsString(), column.getScope());
1811            });
1812          if (!cfs.isEmpty()) {
1813            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1814          }
1815        });
1816        future.complete(replicatedTableCFs);
1817      }
1818    });
1819    return future;
1820  }
1821
1822  @Override
1823  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1824    SnapshotProtos.SnapshotDescription snapshot =
1825      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1826    try {
1827      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1828    } catch (IllegalArgumentException e) {
1829      return failedFuture(e);
1830    }
1831    CompletableFuture<Void> future = new CompletableFuture<>();
1832    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1833    addListener(this.<Long> newMasterCaller()
1834      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1835        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1836        resp -> resp.getExpectedTimeout()))
1837      .call(), (expectedTimeout, err) -> {
1838        if (err != null) {
1839          future.completeExceptionally(err);
1840          return;
1841        }
1842        TimerTask pollingTask = new TimerTask() {
1843          int tries = 0;
1844          long startTime = EnvironmentEdgeManager.currentTime();
1845          long endTime = startTime + expectedTimeout;
1846          long maxPauseTime = expectedTimeout / maxAttempts;
1847
1848          @Override
1849          public void run(Timeout timeout) throws Exception {
1850            if (EnvironmentEdgeManager.currentTime() < endTime) {
1851              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1852                if (err2 != null) {
1853                  future.completeExceptionally(err2);
1854                } else if (done) {
1855                  future.complete(null);
1856                } else {
1857                  // retry again after pauseTime.
1858                  long pauseTime =
1859                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1860                  pauseTime = Math.min(pauseTime, maxPauseTime);
1861                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1862                    TimeUnit.MILLISECONDS);
1863                }
1864              });
1865            } else {
1866              future.completeExceptionally(
1867                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1868                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1869            }
1870          }
1871        };
1872        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1873      });
1874    return future;
1875  }
1876
1877  @Override
1878  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1879    return this
1880        .<Boolean> newMasterCaller()
1881        .action(
1882          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1883            controller,
1884            stub,
1885            IsSnapshotDoneRequest.newBuilder()
1886                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1887                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1888  }
1889
1890  @Override
1891  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1892    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1893      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1894      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1895    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1896  }
1897
1898  @Override
1899  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1900      boolean restoreAcl) {
1901    CompletableFuture<Void> future = new CompletableFuture<>();
1902    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1903      if (err != null) {
1904        future.completeExceptionally(err);
1905        return;
1906      }
1907      TableName tableName = null;
1908      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1909        for (SnapshotDescription snap : snapshotDescriptions) {
1910          if (snap.getName().equals(snapshotName)) {
1911            tableName = snap.getTableName();
1912            break;
1913          }
1914        }
1915      }
1916      if (tableName == null) {
1917        future.completeExceptionally(new RestoreSnapshotException(
1918          "Unable to find the table name for snapshot=" + snapshotName));
1919        return;
1920      }
1921      final TableName finalTableName = tableName;
1922      addListener(tableExists(finalTableName), (exists, err2) -> {
1923        if (err2 != null) {
1924          future.completeExceptionally(err2);
1925        } else if (!exists) {
1926          // if table does not exist, then just clone snapshot into new table.
1927          completeConditionalOnFuture(future,
1928            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1929        } else {
1930          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1931            if (err4 != null) {
1932              future.completeExceptionally(err4);
1933            } else if (!disabled) {
1934              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1935            } else {
1936              completeConditionalOnFuture(future,
1937                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1938            }
1939          });
1940        }
1941      });
1942    });
1943    return future;
1944  }
1945
1946  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1947      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1948    if (takeFailSafeSnapshot) {
1949      CompletableFuture<Void> future = new CompletableFuture<>();
1950      // Step.1 Take a snapshot of the current state
1951      String failSafeSnapshotSnapshotNameFormat =
1952        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1953          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1954      final String failSafeSnapshotSnapshotName =
1955        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1956          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1957          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1958      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1959      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1960        if (err != null) {
1961          future.completeExceptionally(err);
1962        } else {
1963          // Step.2 Restore snapshot
1964          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1965            (void2, err2) -> {
1966              if (err2 != null) {
1967                // Step.3.a Something went wrong during the restore and try to rollback.
1968                addListener(
1969                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1970                  (void3, err3) -> {
1971                    if (err3 != null) {
1972                      future.completeExceptionally(err3);
1973                    } else {
1974                      String msg =
1975                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1976                          failSafeSnapshotSnapshotName + " succeeded.";
1977                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1978                    }
1979                  });
1980              } else {
1981                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1982                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1983                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1984                  if (err3 != null) {
1985                    LOG.error(
1986                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1987                      err3);
1988                  }
1989                  future.complete(ret3);
1990                });
1991              }
1992            });
1993        }
1994      });
1995      return future;
1996    } else {
1997      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
1998    }
1999  }
2000
2001  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2002      CompletableFuture<T> parentFuture) {
2003    addListener(parentFuture, (res, err) -> {
2004      if (err != null) {
2005        dependentFuture.completeExceptionally(err);
2006      } else {
2007        dependentFuture.complete(res);
2008      }
2009    });
2010  }
2011
2012  @Override
2013  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2014      boolean restoreAcl) {
2015    CompletableFuture<Void> future = new CompletableFuture<>();
2016    addListener(tableExists(tableName), (exists, err) -> {
2017      if (err != null) {
2018        future.completeExceptionally(err);
2019      } else if (exists) {
2020        future.completeExceptionally(new TableExistsException(tableName));
2021      } else {
2022        completeConditionalOnFuture(future,
2023          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2024      }
2025    });
2026    return future;
2027  }
2028
2029  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2030      boolean restoreAcl) {
2031    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2032      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2033    try {
2034      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2035    } catch (IllegalArgumentException e) {
2036      return failedFuture(e);
2037    }
2038    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2039      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2040        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2041          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2042        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2043      .call());
2044  }
2045
2046  @Override
2047  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2048    return getCompletedSnapshots(null);
2049  }
2050
2051  @Override
2052  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2053    Preconditions.checkNotNull(pattern,
2054      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2055    return getCompletedSnapshots(pattern);
2056  }
2057
2058  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2059    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2060        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2061        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2062          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2063          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2064        .call();
2065  }
2066
2067  @Override
2068  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2069    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2070        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2071    return getCompletedSnapshots(tableNamePattern, null);
2072  }
2073
2074  @Override
2075  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2076      Pattern snapshotNamePattern) {
2077    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2078        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2079    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2080        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2081    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2082  }
2083
2084  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2085      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2086    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2087    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2088      if (err != null) {
2089        future.completeExceptionally(err);
2090        return;
2091      }
2092      if (tableNames == null || tableNames.size() <= 0) {
2093        future.complete(Collections.emptyList());
2094        return;
2095      }
2096      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2097        if (err2 != null) {
2098          future.completeExceptionally(err2);
2099          return;
2100        }
2101        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2102          future.complete(Collections.emptyList());
2103          return;
2104        }
2105        future.complete(snapshotDescList.stream()
2106          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2107          .collect(Collectors.toList()));
2108      });
2109    });
2110    return future;
2111  }
2112
2113  @Override
2114  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2115    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2116  }
2117
2118  @Override
2119  public CompletableFuture<Void> deleteSnapshots() {
2120    return internalDeleteSnapshots(null, null);
2121  }
2122
2123  @Override
2124  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2125    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2126        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2127    return internalDeleteSnapshots(null, snapshotNamePattern);
2128  }
2129
2130  @Override
2131  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2132    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2133        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2134    return internalDeleteSnapshots(tableNamePattern, null);
2135  }
2136
2137  @Override
2138  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2139      Pattern snapshotNamePattern) {
2140    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2141        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2142    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2143        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2144    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2145  }
2146
2147  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2148      Pattern snapshotNamePattern) {
2149    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2150    if (tableNamePattern == null) {
2151      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2152    } else {
2153      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2154    }
2155    CompletableFuture<Void> future = new CompletableFuture<>();
2156    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2157      if (err != null) {
2158        future.completeExceptionally(err);
2159        return;
2160      }
2161      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2162        future.complete(null);
2163        return;
2164      }
2165      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2166        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2167          if (e != null) {
2168            future.completeExceptionally(e);
2169          } else {
2170            future.complete(v);
2171          }
2172        });
2173    }));
2174    return future;
2175  }
2176
2177  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2178    return this
2179        .<Void> newMasterCaller()
2180        .action(
2181          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2182            controller,
2183            stub,
2184            DeleteSnapshotRequest.newBuilder()
2185                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2186                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2187  }
2188
2189  @Override
2190  public CompletableFuture<Void> execProcedure(String signature, String instance,
2191      Map<String, String> props) {
2192    CompletableFuture<Void> future = new CompletableFuture<>();
2193    ProcedureDescription procDesc =
2194      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2195    addListener(this.<Long> newMasterCaller()
2196      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2197        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2198        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2199      .call(), (expectedTimeout, err) -> {
2200        if (err != null) {
2201          future.completeExceptionally(err);
2202          return;
2203        }
2204        TimerTask pollingTask = new TimerTask() {
2205          int tries = 0;
2206          long startTime = EnvironmentEdgeManager.currentTime();
2207          long endTime = startTime + expectedTimeout;
2208          long maxPauseTime = expectedTimeout / maxAttempts;
2209
2210          @Override
2211          public void run(Timeout timeout) throws Exception {
2212            if (EnvironmentEdgeManager.currentTime() < endTime) {
2213              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2214                if (err2 != null) {
2215                  future.completeExceptionally(err2);
2216                  return;
2217                }
2218                if (done) {
2219                  future.complete(null);
2220                } else {
2221                  // retry again after pauseTime.
2222                  long pauseTime =
2223                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2224                  pauseTime = Math.min(pauseTime, maxPauseTime);
2225                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2226                    TimeUnit.MICROSECONDS);
2227                }
2228              });
2229            } else {
2230              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2231                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2232            }
2233          }
2234        };
2235        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2236        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2237      });
2238    return future;
2239  }
2240
2241  @Override
2242  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2243      Map<String, String> props) {
2244    ProcedureDescription proDesc =
2245        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2246    return this.<byte[]> newMasterCaller()
2247        .action(
2248          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2249            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2250            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2251            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2252        .call();
2253  }
2254
2255  @Override
2256  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2257      Map<String, String> props) {
2258    ProcedureDescription proDesc =
2259        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2260    return this.<Boolean> newMasterCaller()
2261        .action((controller, stub) -> this
2262            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2263              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2264              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2265        .call();
2266  }
2267
2268  @Override
2269  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2270    return this.<Boolean> newMasterCaller().action(
2271      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2272        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2273        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2274        .call();
2275  }
2276
2277  @Override
2278  public CompletableFuture<String> getProcedures() {
2279    return this
2280        .<String> newMasterCaller()
2281        .action(
2282          (controller, stub) -> this
2283              .<GetProceduresRequest, GetProceduresResponse, String> call(
2284                controller, stub, GetProceduresRequest.newBuilder().build(),
2285                (s, c, req, done) -> s.getProcedures(c, req, done),
2286                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2287  }
2288
2289  @Override
2290  public CompletableFuture<String> getLocks() {
2291    return this
2292        .<String> newMasterCaller()
2293        .action(
2294          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2295            controller, stub, GetLocksRequest.newBuilder().build(),
2296            (s, c, req, done) -> s.getLocks(c, req, done),
2297            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2298  }
2299
2300  @Override
2301  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2302    return this.<Void> newMasterCaller()
2303        .action((controller, stub) -> this
2304          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2305            controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2306            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2307        .call();
2308  }
2309
2310  @Override
2311  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2312    return this.<List<ServerName>> newMasterCaller()
2313        .action((controller, stub) -> this
2314          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2315            List<ServerName>> call(
2316              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2317              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2318              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2319                  .collect(Collectors.toList())))
2320        .call();
2321  }
2322
2323  @Override
2324  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2325      List<byte[]> encodedRegionNames) {
2326    return this.<Void> newMasterCaller()
2327        .action((controller, stub) -> this
2328          .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2329            stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2330            (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2331        .call();
2332  }
2333
2334  /**
2335   * Get the region location for the passed region name. The region name may be a full region name
2336   * or encoded region name. If the region does not found, then it'll throw an
2337   * UnknownRegionException wrapped by a {@link CompletableFuture}
2338   * @param regionNameOrEncodedRegionName region name or encoded region name
2339   * @return region location, wrapped by a {@link CompletableFuture}
2340   */
2341  @VisibleForTesting
2342  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2343    if (regionNameOrEncodedRegionName == null) {
2344      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2345    }
2346    try {
2347      CompletableFuture<Optional<HRegionLocation>> future;
2348      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2349        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2350        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2351          // old format encodedName, should be meta region
2352          future = connection.registry.getMetaRegionLocation()
2353            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2354              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2355        } else {
2356          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2357            regionNameOrEncodedRegionName);
2358        }
2359      } else {
2360        RegionInfo regionInfo =
2361          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2362        if (regionInfo.isMetaRegion()) {
2363          future = connection.registry.getMetaRegionLocation()
2364            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2365              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2366              .findFirst());
2367        } else {
2368          future =
2369            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2370        }
2371      }
2372
2373      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2374      addListener(future, (location, err) -> {
2375        if (err != null) {
2376          returnedFuture.completeExceptionally(err);
2377          return;
2378        }
2379        if (!location.isPresent() || location.get().getRegion() == null) {
2380          returnedFuture.completeExceptionally(
2381            new UnknownRegionException("Invalid region name or encoded region name: " +
2382              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2383        } else {
2384          returnedFuture.complete(location.get());
2385        }
2386      });
2387      return returnedFuture;
2388    } catch (IOException e) {
2389      return failedFuture(e);
2390    }
2391  }
2392
2393  /**
2394   * Get the region info for the passed region name. The region name may be a full region name or
2395   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2396   * wrapped by a {@link CompletableFuture}
2397   * @param regionNameOrEncodedRegionName
2398   * @return region info, wrapped by a {@link CompletableFuture}
2399   */
2400  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2401    if (regionNameOrEncodedRegionName == null) {
2402      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2403    }
2404
2405    if (Bytes.equals(regionNameOrEncodedRegionName,
2406      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2407      Bytes.equals(regionNameOrEncodedRegionName,
2408        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2409      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2410    }
2411
2412    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2413    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2414      if (err != null) {
2415        future.completeExceptionally(err);
2416      } else {
2417        future.complete(location.getRegion());
2418      }
2419    });
2420    return future;
2421  }
2422
2423  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2424    if (numRegions < 3) {
2425      throw new IllegalArgumentException("Must create at least three regions");
2426    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2427      throw new IllegalArgumentException("Start key must be smaller than end key");
2428    }
2429    if (numRegions == 3) {
2430      return new byte[][] { startKey, endKey };
2431    }
2432    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2433    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2434      throw new IllegalArgumentException("Unable to split key range into enough regions");
2435    }
2436    return splitKeys;
2437  }
2438
2439  private void verifySplitKeys(byte[][] splitKeys) {
2440    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2441    // Verify there are no duplicate split keys
2442    byte[] lastKey = null;
2443    for (byte[] splitKey : splitKeys) {
2444      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2445        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2446      }
2447      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2448        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2449            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2450      }
2451      lastKey = splitKey;
2452    }
2453  }
2454
2455  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2456
2457    abstract void onFinished();
2458
2459    abstract void onError(Throwable error);
2460
2461    @Override
2462    public void accept(Void v, Throwable error) {
2463      if (error != null) {
2464        onError(error);
2465        return;
2466      }
2467      onFinished();
2468    }
2469  }
2470
2471  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2472    protected final TableName tableName;
2473
2474    TableProcedureBiConsumer(TableName tableName) {
2475      this.tableName = tableName;
2476    }
2477
2478    abstract String getOperationType();
2479
2480    String getDescription() {
2481      return "Operation: " + getOperationType() + ", " + "Table Name: "
2482          + tableName.getNameWithNamespaceInclAsString();
2483    }
2484
2485    @Override
2486    void onFinished() {
2487      LOG.info(getDescription() + " completed");
2488    }
2489
2490    @Override
2491    void onError(Throwable error) {
2492      LOG.info(getDescription() + " failed with " + error.getMessage());
2493    }
2494  }
2495
2496  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2497    protected final String namespaceName;
2498
2499    NamespaceProcedureBiConsumer(String namespaceName) {
2500      this.namespaceName = namespaceName;
2501    }
2502
2503    abstract String getOperationType();
2504
2505    String getDescription() {
2506      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2507    }
2508
2509    @Override
2510    void onFinished() {
2511      LOG.info(getDescription() + " completed");
2512    }
2513
2514    @Override
2515    void onError(Throwable error) {
2516      LOG.info(getDescription() + " failed with " + error.getMessage());
2517    }
2518  }
2519
2520  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2521
2522    CreateTableProcedureBiConsumer(TableName tableName) {
2523      super(tableName);
2524    }
2525
2526    @Override
2527    String getOperationType() {
2528      return "CREATE";
2529    }
2530  }
2531
2532  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2533
2534    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2535      super(tableName);
2536    }
2537
2538    @Override
2539    String getOperationType() {
2540      return "ENABLE";
2541    }
2542  }
2543
2544  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2545
2546    DeleteTableProcedureBiConsumer(TableName tableName) {
2547      super(tableName);
2548    }
2549
2550    @Override
2551    String getOperationType() {
2552      return "DELETE";
2553    }
2554
2555    @Override
2556    void onFinished() {
2557      connection.getLocator().clearCache(this.tableName);
2558      super.onFinished();
2559    }
2560  }
2561
2562  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2563
2564    TruncateTableProcedureBiConsumer(TableName tableName) {
2565      super(tableName);
2566    }
2567
2568    @Override
2569    String getOperationType() {
2570      return "TRUNCATE";
2571    }
2572  }
2573
2574  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2575
2576    EnableTableProcedureBiConsumer(TableName tableName) {
2577      super(tableName);
2578    }
2579
2580    @Override
2581    String getOperationType() {
2582      return "ENABLE";
2583    }
2584  }
2585
2586  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2587
2588    DisableTableProcedureBiConsumer(TableName tableName) {
2589      super(tableName);
2590    }
2591
2592    @Override
2593    String getOperationType() {
2594      return "DISABLE";
2595    }
2596  }
2597
2598  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2599
2600    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2601      super(tableName);
2602    }
2603
2604    @Override
2605    String getOperationType() {
2606      return "ADD_COLUMN_FAMILY";
2607    }
2608  }
2609
2610  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2611
2612    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2613      super(tableName);
2614    }
2615
2616    @Override
2617    String getOperationType() {
2618      return "DELETE_COLUMN_FAMILY";
2619    }
2620  }
2621
2622  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2623
2624    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2625      super(tableName);
2626    }
2627
2628    @Override
2629    String getOperationType() {
2630      return "MODIFY_COLUMN_FAMILY";
2631    }
2632  }
2633
2634  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2635
2636    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2637      super(namespaceName);
2638    }
2639
2640    @Override
2641    String getOperationType() {
2642      return "CREATE_NAMESPACE";
2643    }
2644  }
2645
2646  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2647
2648    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2649      super(namespaceName);
2650    }
2651
2652    @Override
2653    String getOperationType() {
2654      return "DELETE_NAMESPACE";
2655    }
2656  }
2657
2658  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2659
2660    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2661      super(namespaceName);
2662    }
2663
2664    @Override
2665    String getOperationType() {
2666      return "MODIFY_NAMESPACE";
2667    }
2668  }
2669
2670  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2671
2672    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2673      super(tableName);
2674    }
2675
2676    @Override
2677    String getOperationType() {
2678      return "MERGE_REGIONS";
2679    }
2680  }
2681
2682  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2683
2684    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2685      super(tableName);
2686    }
2687
2688    @Override
2689    String getOperationType() {
2690      return "SPLIT_REGION";
2691    }
2692  }
2693
2694  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2695    private final String peerId;
2696    private final Supplier<String> getOperation;
2697
2698    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2699      this.peerId = peerId;
2700      this.getOperation = getOperation;
2701    }
2702
2703    String getDescription() {
2704      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2705    }
2706
2707    @Override
2708    void onFinished() {
2709      LOG.info(getDescription() + " completed");
2710    }
2711
2712    @Override
2713    void onError(Throwable error) {
2714      LOG.info(getDescription() + " failed with " + error.getMessage());
2715    }
2716  }
2717
2718  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2719    CompletableFuture<Void> future = new CompletableFuture<>();
2720    addListener(procFuture, (procId, error) -> {
2721      if (error != null) {
2722        future.completeExceptionally(error);
2723        return;
2724      }
2725      getProcedureResult(procId, future, 0);
2726    });
2727    return future;
2728  }
2729
2730  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2731    addListener(
2732      this.<GetProcedureResultResponse> newMasterCaller()
2733        .action((controller, stub) -> this
2734          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2735            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2736            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2737        .call(),
2738      (response, error) -> {
2739        if (error != null) {
2740          LOG.warn("failed to get the procedure result procId={}", procId,
2741            ConnectionUtils.translateException(error));
2742          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2743            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2744          return;
2745        }
2746        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2747          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2748            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2749          return;
2750        }
2751        if (response.hasException()) {
2752          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2753          future.completeExceptionally(ioe);
2754        } else {
2755          future.complete(null);
2756        }
2757      });
2758  }
2759
2760  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2761    CompletableFuture<T> future = new CompletableFuture<>();
2762    future.completeExceptionally(error);
2763    return future;
2764  }
2765
2766  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2767    if (error != null) {
2768      future.completeExceptionally(error);
2769      return true;
2770    }
2771    return false;
2772  }
2773
2774  @Override
2775  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2776    return getClusterMetrics(EnumSet.allOf(Option.class));
2777  }
2778
2779  @Override
2780  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2781    return this
2782        .<ClusterMetrics> newMasterCaller()
2783        .action(
2784          (controller, stub) -> this
2785              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2786                stub, RequestConverter.buildGetClusterStatusRequest(options),
2787                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2788                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2789  }
2790
2791  @Override
2792  public CompletableFuture<Void> shutdown() {
2793    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2794      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2795        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2796        resp -> null))
2797      .call();
2798  }
2799
2800  @Override
2801  public CompletableFuture<Void> stopMaster() {
2802    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2803      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2804        controller, stub, StopMasterRequest.newBuilder().build(),
2805        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2806      .call();
2807  }
2808
2809  @Override
2810  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2811    StopServerRequest request = RequestConverter
2812      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2813    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2814      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2815        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2816        resp -> null))
2817      .serverName(serverName).call();
2818  }
2819
2820  @Override
2821  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2822    return this
2823        .<Void> newAdminCaller()
2824        .action(
2825          (controller, stub) -> this
2826              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2827                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2828                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2829        .serverName(serverName).call();
2830  }
2831
2832  @Override
2833  public CompletableFuture<Void> updateConfiguration() {
2834    CompletableFuture<Void> future = new CompletableFuture<Void>();
2835    addListener(
2836      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2837      (status, err) -> {
2838        if (err != null) {
2839          future.completeExceptionally(err);
2840        } else {
2841          List<CompletableFuture<Void>> futures = new ArrayList<>();
2842          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2843          futures.add(updateConfiguration(status.getMasterName()));
2844          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2845          addListener(
2846            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2847            (result, err2) -> {
2848              if (err2 != null) {
2849                future.completeExceptionally(err2);
2850              } else {
2851                future.complete(result);
2852              }
2853            });
2854        }
2855      });
2856    return future;
2857  }
2858
2859  @Override
2860  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2861    return this
2862        .<Void> newAdminCaller()
2863        .action(
2864          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2865            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2866            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2867        .serverName(serverName).call();
2868  }
2869
2870  @Override
2871  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2872    return this
2873        .<Void> newAdminCaller()
2874        .action(
2875          (controller, stub) -> this
2876              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2877                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2878                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2879        .serverName(serverName).call();
2880  }
2881
2882  @Override
2883  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2884    return this
2885        .<List<SecurityCapability>> newMasterCaller()
2886        .action(
2887          (controller, stub) -> this
2888              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2889                controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2890                    done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2891                    .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2892  }
2893
2894  @Override
2895  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2896    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2897  }
2898
2899  @Override
2900  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2901      TableName tableName) {
2902    Preconditions.checkNotNull(tableName,
2903      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2904    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2905  }
2906
2907  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2908      ServerName serverName) {
2909    return this.<List<RegionMetrics>> newAdminCaller()
2910        .action((controller, stub) -> this
2911            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2912              adminCall(controller, stub, request, (s, c, req, done) ->
2913                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2914        .serverName(serverName).call();
2915  }
2916
2917  @Override
2918  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2919    return this
2920        .<Boolean> newMasterCaller()
2921        .action(
2922          (controller, stub) -> this
2923              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2924                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2925                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2926                resp -> resp.getInMaintenanceMode())).call();
2927  }
2928
2929  @Override
2930  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2931      CompactType compactType) {
2932    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2933
2934    switch (compactType) {
2935      case MOB:
2936        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2937          if (err != null) {
2938            future.completeExceptionally(err);
2939            return;
2940          }
2941          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2942
2943          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2944            .action((controller, stub) -> this
2945              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2946                controller, stub,
2947                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2948                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2949            .call(), (resp2, err2) -> {
2950              if (err2 != null) {
2951                future.completeExceptionally(err2);
2952              } else {
2953                if (resp2.hasCompactionState()) {
2954                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2955                } else {
2956                  future.complete(CompactionState.NONE);
2957                }
2958              }
2959            });
2960        });
2961        break;
2962      case NORMAL:
2963        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2964          if (err != null) {
2965            future.completeExceptionally(err);
2966            return;
2967          }
2968          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2969          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2970          locations.stream().filter(loc -> loc.getServerName() != null)
2971            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2972            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2973              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2974                // If any region compaction state is MAJOR_AND_MINOR
2975                // the table compaction state is MAJOR_AND_MINOR, too.
2976                if (err2 != null) {
2977                  future.completeExceptionally(unwrapCompletionException(err2));
2978                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2979                  future.complete(regionState);
2980                } else {
2981                  regionStates.add(regionState);
2982                }
2983              }));
2984            });
2985          addListener(
2986            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2987            (ret, err3) -> {
2988              // If future not completed, check all regions's compaction state
2989              if (!future.isCompletedExceptionally() && !future.isDone()) {
2990                CompactionState state = CompactionState.NONE;
2991                for (CompactionState regionState : regionStates) {
2992                  switch (regionState) {
2993                    case MAJOR:
2994                      if (state == CompactionState.MINOR) {
2995                        future.complete(CompactionState.MAJOR_AND_MINOR);
2996                      } else {
2997                        state = CompactionState.MAJOR;
2998                      }
2999                      break;
3000                    case MINOR:
3001                      if (state == CompactionState.MAJOR) {
3002                        future.complete(CompactionState.MAJOR_AND_MINOR);
3003                      } else {
3004                        state = CompactionState.MINOR;
3005                      }
3006                      break;
3007                    case NONE:
3008                    default:
3009                  }
3010                }
3011                if (!future.isDone()) {
3012                  future.complete(state);
3013                }
3014              }
3015            });
3016        });
3017        break;
3018      default:
3019        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3020    }
3021
3022    return future;
3023  }
3024
3025  @Override
3026  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3027    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3028    addListener(getRegionLocation(regionName), (location, err) -> {
3029      if (err != null) {
3030        future.completeExceptionally(err);
3031        return;
3032      }
3033      ServerName serverName = location.getServerName();
3034      if (serverName == null) {
3035        future
3036          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3037        return;
3038      }
3039      addListener(
3040        this.<GetRegionInfoResponse> newAdminCaller()
3041          .action((controller, stub) -> this
3042            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3043              controller, stub,
3044              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3045                true),
3046              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3047          .serverName(serverName).call(),
3048        (resp2, err2) -> {
3049          if (err2 != null) {
3050            future.completeExceptionally(err2);
3051          } else {
3052            if (resp2.hasCompactionState()) {
3053              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3054            } else {
3055              future.complete(CompactionState.NONE);
3056            }
3057          }
3058        });
3059    });
3060    return future;
3061  }
3062
3063  @Override
3064  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3065    MajorCompactionTimestampRequest request =
3066        MajorCompactionTimestampRequest.newBuilder()
3067            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3068    return this
3069        .<Optional<Long>> newMasterCaller()
3070        .action(
3071          (controller, stub) -> this
3072              .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
3073                controller, stub, request,
3074                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3075                ProtobufUtil::toOptionalTimestamp)).call();
3076  }
3077
3078  @Override
3079  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3080      byte[] regionName) {
3081    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3082    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3083    addListener(getRegionInfo(regionName), (region, err) -> {
3084      if (err != null) {
3085        future.completeExceptionally(err);
3086        return;
3087      }
3088      MajorCompactionTimestampForRegionRequest.Builder builder =
3089        MajorCompactionTimestampForRegionRequest.newBuilder();
3090      builder.setRegion(
3091        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3092      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3093        .<MajorCompactionTimestampForRegionRequest,
3094        MajorCompactionTimestampResponse, Optional<Long>> call(
3095          controller, stub, builder.build(),
3096          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3097          ProtobufUtil::toOptionalTimestamp))
3098        .call(), (timestamp, err2) -> {
3099          if (err2 != null) {
3100            future.completeExceptionally(err2);
3101          } else {
3102            future.complete(timestamp);
3103          }
3104        });
3105    });
3106    return future;
3107  }
3108
3109  @Override
3110  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3111      List<String> serverNamesList) {
3112    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3113    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3114      if (err != null) {
3115        future.completeExceptionally(err);
3116        return;
3117      }
3118      // Accessed by multiple threads.
3119      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3120      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3121      serverNames.stream().forEach(serverName -> {
3122        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3123          if (err2 != null) {
3124            future.completeExceptionally(unwrapCompletionException(err2));
3125          } else {
3126            serverStates.put(serverName, serverState);
3127          }
3128        }));
3129      });
3130      addListener(
3131        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3132        (ret, err3) -> {
3133          if (!future.isCompletedExceptionally()) {
3134            if (err3 != null) {
3135              future.completeExceptionally(err3);
3136            } else {
3137              future.complete(serverStates);
3138            }
3139          }
3140        });
3141    });
3142    return future;
3143  }
3144
3145  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3146    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3147    if (serverNamesList.isEmpty()) {
3148      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3149        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3150      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3151        if (err != null) {
3152          future.completeExceptionally(err);
3153        } else {
3154          future.complete(clusterMetrics.getServersName());
3155        }
3156      });
3157      return future;
3158    } else {
3159      List<ServerName> serverList = new ArrayList<>();
3160      for (String regionServerName : serverNamesList) {
3161        ServerName serverName = null;
3162        try {
3163          serverName = ServerName.valueOf(regionServerName);
3164        } catch (Exception e) {
3165          future.completeExceptionally(
3166            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3167        }
3168        if (serverName == null) {
3169          future.completeExceptionally(
3170            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3171        } else {
3172          serverList.add(serverName);
3173        }
3174      }
3175      future.complete(serverList);
3176    }
3177    return future;
3178  }
3179
3180  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3181    return this
3182        .<Boolean>newAdminCaller()
3183        .serverName(serverName)
3184        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3185            Boolean>adminCall(controller, stub,
3186            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3187            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3188  }
3189
3190  @Override
3191  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3192    return this.<Boolean> newMasterCaller()
3193      .action((controller, stub) -> this
3194        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3195          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3196          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3197          (resp) -> resp.getPrevBalanceValue()))
3198      .call();
3199  }
3200
3201  @Override
3202  public CompletableFuture<Boolean> balance(boolean forcible) {
3203    return this
3204        .<Boolean> newMasterCaller()
3205        .action(
3206          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3207            stub, RequestConverter.buildBalanceRequest(forcible),
3208            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3209  }
3210
3211  @Override
3212  public CompletableFuture<Boolean> isBalancerEnabled() {
3213    return this
3214        .<Boolean> newMasterCaller()
3215        .action(
3216          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3217            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3218            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3219        .call();
3220  }
3221
3222  @Override
3223  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3224    return this
3225        .<Boolean> newMasterCaller()
3226        .action(
3227          (controller, stub) -> this
3228              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3229                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3230                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3231                    .getPrevNormalizerValue())).call();
3232  }
3233
3234  @Override
3235  public CompletableFuture<Boolean> isNormalizerEnabled() {
3236    return this
3237        .<Boolean> newMasterCaller()
3238        .action(
3239          (controller, stub) -> this
3240              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3241                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3242                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3243                (resp) -> resp.getEnabled())).call();
3244  }
3245
3246  @Override
3247  public CompletableFuture<Boolean> normalize() {
3248    return this
3249        .<Boolean> newMasterCaller()
3250        .action(
3251          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3252            controller, stub, RequestConverter.buildNormalizeRequest(),
3253            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3254        .call();
3255  }
3256
3257  @Override
3258  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3259    return this
3260        .<Boolean> newMasterCaller()
3261        .action(
3262          (controller, stub) -> this
3263              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3264                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3265                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3266                    .getPrevValue())).call();
3267  }
3268
3269  @Override
3270  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3271    return this
3272        .<Boolean> newMasterCaller()
3273        .action(
3274          (controller, stub) -> this
3275              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3276                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3277                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3278        .call();
3279  }
3280
3281  @Override
3282  public CompletableFuture<Boolean> runCleanerChore() {
3283    return this
3284        .<Boolean> newMasterCaller()
3285        .action(
3286          (controller, stub) -> this
3287              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3288                RequestConverter.buildRunCleanerChoreRequest(),
3289                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3290                (resp) -> resp.getCleanerChoreRan())).call();
3291  }
3292
3293  @Override
3294  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3295    return this
3296        .<Boolean> newMasterCaller()
3297        .action(
3298          (controller, stub) -> this
3299              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3300                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3301                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3302                    .getPrevValue())).call();
3303  }
3304
3305  @Override
3306  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3307    return this
3308        .<Boolean> newMasterCaller()
3309        .action(
3310          (controller, stub) -> this
3311              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3312                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3313                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3314                    .getValue())).call();
3315  }
3316
3317  @Override
3318  public CompletableFuture<Integer> runCatalogJanitor() {
3319    return this
3320        .<Integer> newMasterCaller()
3321        .action(
3322          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3323            controller, stub, RequestConverter.buildCatalogScanRequest(),
3324            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3325        .call();
3326  }
3327
3328  @Override
3329  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3330      ServiceCaller<S, R> callable) {
3331    MasterCoprocessorRpcChannelImpl channel =
3332        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3333    S stub = stubMaker.apply(channel);
3334    CompletableFuture<R> future = new CompletableFuture<>();
3335    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3336    callable.call(stub, controller, resp -> {
3337      if (controller.failed()) {
3338        future.completeExceptionally(controller.getFailed());
3339      } else {
3340        future.complete(resp);
3341      }
3342    });
3343    return future;
3344  }
3345
3346  @Override
3347  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3348      ServiceCaller<S, R> callable, ServerName serverName) {
3349    RegionServerCoprocessorRpcChannelImpl channel =
3350        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3351          serverName));
3352    S stub = stubMaker.apply(channel);
3353    CompletableFuture<R> future = new CompletableFuture<>();
3354    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3355    callable.call(stub, controller, resp -> {
3356      if (controller.failed()) {
3357        future.completeExceptionally(controller.getFailed());
3358      } else {
3359        future.complete(resp);
3360      }
3361    });
3362    return future;
3363  }
3364
3365  @Override
3366  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3367    return this.<List<ServerName>> newMasterCaller()
3368      .action((controller, stub) -> this
3369        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3370          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3371          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3372          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3373      .call();
3374  }
3375
3376  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3377    return this.connection.callerFactory.<T> serverRequest()
3378      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3379      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3380      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3381      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3382  }
3383
3384  @Override
3385  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3386    if (tableName == null) {
3387      return failedFuture(new IllegalArgumentException("Table name is null"));
3388    }
3389    CompletableFuture<Void> future = new CompletableFuture<>();
3390    addListener(tableExists(tableName), (exist, err) -> {
3391      if (err != null) {
3392        future.completeExceptionally(err);
3393        return;
3394      }
3395      if (!exist) {
3396        future.completeExceptionally(new TableNotFoundException(
3397          "Table '" + tableName.getNameAsString() + "' does not exists."));
3398        return;
3399      }
3400      addListener(getTableSplits(tableName), (splits, err1) -> {
3401        if (err1 != null) {
3402          future.completeExceptionally(err1);
3403        } else {
3404          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3405            if (err2 != null) {
3406              future.completeExceptionally(err2);
3407            } else {
3408              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3409                if (err3 != null) {
3410                  future.completeExceptionally(err3);
3411                } else {
3412                  future.complete(result3);
3413                }
3414              });
3415            }
3416          });
3417        }
3418      });
3419    });
3420    return future;
3421  }
3422
3423  @Override
3424  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3425    if (tableName == null) {
3426      return failedFuture(new IllegalArgumentException("Table name is null"));
3427    }
3428    CompletableFuture<Void> future = new CompletableFuture<>();
3429    addListener(tableExists(tableName), (exist, err) -> {
3430      if (err != null) {
3431        future.completeExceptionally(err);
3432        return;
3433      }
3434      if (!exist) {
3435        future.completeExceptionally(new TableNotFoundException(
3436          "Table '" + tableName.getNameAsString() + "' does not exists."));
3437        return;
3438      }
3439      addListener(setTableReplication(tableName, false), (result, err2) -> {
3440        if (err2 != null) {
3441          future.completeExceptionally(err2);
3442        } else {
3443          future.complete(result);
3444        }
3445      });
3446    });
3447    return future;
3448  }
3449
3450  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3451    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3452    addListener(
3453      getRegions(tableName).thenApply(regions -> regions.stream()
3454        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3455      (regions, err2) -> {
3456        if (err2 != null) {
3457          future.completeExceptionally(err2);
3458          return;
3459        }
3460        if (regions.size() == 1) {
3461          future.complete(null);
3462        } else {
3463          byte[][] splits = new byte[regions.size() - 1][];
3464          for (int i = 1; i < regions.size(); i++) {
3465            splits[i - 1] = regions.get(i).getStartKey();
3466          }
3467          future.complete(splits);
3468        }
3469      });
3470    return future;
3471  }
3472
3473  /**
3474   * Connect to peer and check the table descriptor on peer:
3475   * <ol>
3476   * <li>Create the same table on peer when not exist.</li>
3477   * <li>Throw an exception if the table already has replication enabled on any of the column
3478   * families.</li>
3479   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3480   * </ol>
3481   * @param tableName name of the table to sync to the peer
3482   * @param splits table split keys
3483   */
3484  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3485      byte[][] splits) {
3486    CompletableFuture<Void> future = new CompletableFuture<>();
3487    addListener(listReplicationPeers(), (peers, err) -> {
3488      if (err != null) {
3489        future.completeExceptionally(err);
3490        return;
3491      }
3492      if (peers == null || peers.size() <= 0) {
3493        future.completeExceptionally(
3494          new IllegalArgumentException("Found no peer cluster for replication."));
3495        return;
3496      }
3497      List<CompletableFuture<Void>> futures = new ArrayList<>();
3498      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3499        .forEach(peer -> {
3500          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3501        });
3502      addListener(
3503        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3504        (result, err2) -> {
3505          if (err2 != null) {
3506            future.completeExceptionally(err2);
3507          } else {
3508            future.complete(result);
3509          }
3510        });
3511    });
3512    return future;
3513  }
3514
3515  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3516      ReplicationPeerDescription peer) {
3517    Configuration peerConf = null;
3518    try {
3519      peerConf =
3520        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3521    } catch (IOException e) {
3522      return failedFuture(e);
3523    }
3524    CompletableFuture<Void> future = new CompletableFuture<>();
3525    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3526      if (err != null) {
3527        future.completeExceptionally(err);
3528        return;
3529      }
3530      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3531        if (err1 != null) {
3532          future.completeExceptionally(err1);
3533          return;
3534        }
3535        AsyncAdmin peerAdmin = conn.getAdmin();
3536        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3537          if (err2 != null) {
3538            future.completeExceptionally(err2);
3539            return;
3540          }
3541          if (!exist) {
3542            CompletableFuture<Void> createTableFuture = null;
3543            if (splits == null) {
3544              createTableFuture = peerAdmin.createTable(tableDesc);
3545            } else {
3546              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3547            }
3548            addListener(createTableFuture, (result, err3) -> {
3549              if (err3 != null) {
3550                future.completeExceptionally(err3);
3551              } else {
3552                future.complete(result);
3553              }
3554            });
3555          } else {
3556            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3557              (result, err4) -> {
3558                if (err4 != null) {
3559                  future.completeExceptionally(err4);
3560                } else {
3561                  future.complete(result);
3562                }
3563              });
3564          }
3565        });
3566      });
3567    });
3568    return future;
3569  }
3570
3571  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3572      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3573    CompletableFuture<Void> future = new CompletableFuture<>();
3574    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3575      if (err != null) {
3576        future.completeExceptionally(err);
3577        return;
3578      }
3579      if (peerTableDesc == null) {
3580        future.completeExceptionally(
3581          new IllegalArgumentException("Failed to get table descriptor for table " +
3582            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3583        return;
3584      }
3585      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3586        future.completeExceptionally(new IllegalArgumentException(
3587          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3588            ", but the table descriptors are not same when compared with source cluster." +
3589            " Thus can not enable the table's replication switch."));
3590        return;
3591      }
3592      future.complete(null);
3593    });
3594    return future;
3595  }
3596
3597  /**
3598   * Set the table's replication switch if the table's replication switch is already not set.
3599   * @param tableName name of the table
3600   * @param enableRep is replication switch enable or disable
3601   */
3602  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3603    CompletableFuture<Void> future = new CompletableFuture<>();
3604    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3605      if (err != null) {
3606        future.completeExceptionally(err);
3607        return;
3608      }
3609      if (!tableDesc.matchReplicationScope(enableRep)) {
3610        int scope =
3611          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3612        TableDescriptor newTableDesc =
3613          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3614        addListener(modifyTable(newTableDesc), (result, err2) -> {
3615          if (err2 != null) {
3616            future.completeExceptionally(err2);
3617          } else {
3618            future.complete(result);
3619          }
3620        });
3621      } else {
3622        future.complete(null);
3623      }
3624    });
3625    return future;
3626  }
3627
3628  @Override
3629  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3630    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3631    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3632      if (err != null) {
3633        future.completeExceptionally(err);
3634        return;
3635      }
3636      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3637        locations.stream().filter(l -> l.getRegion() != null)
3638          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3639          .collect(Collectors.groupingBy(l -> l.getServerName(),
3640            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3641      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3642      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3643      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3644        futures
3645          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3646            if (err2 != null) {
3647              future.completeExceptionally(unwrapCompletionException(err2));
3648            } else {
3649              aggregator.append(stats);
3650            }
3651          }));
3652      }
3653      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3654        (ret, err3) -> {
3655          if (err3 != null) {
3656            future.completeExceptionally(unwrapCompletionException(err3));
3657          } else {
3658            future.complete(aggregator.sum());
3659          }
3660        });
3661    });
3662    return future;
3663  }
3664
3665  @Override
3666  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3667      boolean preserveSplits) {
3668    CompletableFuture<Void> future = new CompletableFuture<>();
3669    addListener(tableExists(tableName), (exist, err) -> {
3670      if (err != null) {
3671        future.completeExceptionally(err);
3672        return;
3673      }
3674      if (!exist) {
3675        future.completeExceptionally(new TableNotFoundException(tableName));
3676        return;
3677      }
3678      addListener(tableExists(newTableName), (exist1, err1) -> {
3679        if (err1 != null) {
3680          future.completeExceptionally(err1);
3681          return;
3682        }
3683        if (exist1) {
3684          future.completeExceptionally(new TableExistsException(newTableName));
3685          return;
3686        }
3687        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3688          if (err2 != null) {
3689            future.completeExceptionally(err2);
3690            return;
3691          }
3692          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3693          if (preserveSplits) {
3694            addListener(getTableSplits(tableName), (splits, err3) -> {
3695              if (err3 != null) {
3696                future.completeExceptionally(err3);
3697              } else {
3698                addListener(
3699                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3700                  (result, err4) -> {
3701                    if (err4 != null) {
3702                      future.completeExceptionally(err4);
3703                    } else {
3704                      future.complete(result);
3705                    }
3706                  });
3707              }
3708            });
3709          } else {
3710            addListener(createTable(newTableDesc), (result, err5) -> {
3711              if (err5 != null) {
3712                future.completeExceptionally(err5);
3713              } else {
3714                future.complete(result);
3715              }
3716            });
3717          }
3718        });
3719      });
3720    });
3721    return future;
3722  }
3723
3724  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3725      List<RegionInfo> hris) {
3726    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3727      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3728        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3729        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3730        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3731      .serverName(serverName).call();
3732  }
3733
3734  @Override
3735  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3736    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3737        .action((controller, stub) -> this
3738            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3739              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3740              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3741              resp -> resp.getPreviousRpcThrottleEnabled()))
3742        .call();
3743    return future;
3744  }
3745
3746  @Override
3747  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3748    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3749        .action((controller, stub) -> this
3750            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3751              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3752              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3753              resp -> resp.getRpcThrottleEnabled()))
3754        .call();
3755    return future;
3756  }
3757
3758  @Override
3759  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3760    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3761        .action((controller, stub) -> this
3762            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3763              controller, stub,
3764              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3765                  .build(),
3766              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3767              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3768        .call();
3769    return future;
3770  }
3771
3772  @Override
3773  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3774    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3775      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3776      Map<TableName, Long>> call(controller, stub,
3777        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3778        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3779        resp -> resp.getSizesList().stream().collect(Collectors
3780          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3781      .call();
3782  }
3783
3784  @Override
3785  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3786      ServerName serverName) {
3787    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3788      .action((controller, stub) -> this
3789        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3790        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3791          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3792          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3793          resp -> resp.getSnapshotsList().stream()
3794            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3795              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3796      .serverName(serverName).call();
3797  }
3798
3799  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3800      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3801    return this.<SpaceQuotaSnapshot> newMasterCaller()
3802      .action((controller, stub) -> this
3803        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3804          RequestConverter.buildGetQuotaStatesRequest(),
3805          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3806      .call();
3807  }
3808
3809  @Override
3810  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3811    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3812      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3813      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3814  }
3815
3816  @Override
3817  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3818    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3819    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3820      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3821      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3822  }
3823
3824  @Override
3825  public CompletableFuture<Void> grant(UserPermission userPermission,
3826      boolean mergeExistingPermissions) {
3827    return this.<Void> newMasterCaller()
3828        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3829          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3830          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3831        .call();
3832  }
3833
3834  @Override
3835  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3836    return this.<Void> newMasterCaller()
3837        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3838          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3839          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3840        .call();
3841  }
3842
3843  @Override
3844  public CompletableFuture<List<UserPermission>>
3845      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3846    return this.<List<UserPermission>> newMasterCaller().action((controller,
3847        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3848            List<UserPermission>> call(controller, stub,
3849              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3850              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3851              resp -> resp.getUserPermissionList().stream()
3852                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3853                .collect(Collectors.toList())))
3854        .call();
3855  }
3856
3857  @Override
3858  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3859      List<Permission> permissions) {
3860    return this.<List<Boolean>> newMasterCaller()
3861        .action((controller, stub) -> this
3862            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3863              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3864              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3865              resp -> resp.getHasUserPermissionList()))
3866        .call();
3867  }
3868
3869  @Override
3870  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3871      final boolean sync) {
3872    return this.<Boolean>newMasterCaller()
3873        .action((controller, stub) -> this
3874            .call(controller, stub,
3875                RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3876                MasterService.Interface::switchSnapshotCleanup,
3877                SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
3878        .call();
3879  }
3880
3881  @Override
3882  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3883    return this.<Boolean>newMasterCaller()
3884        .action((controller, stub) -> this
3885            .call(controller, stub,
3886                RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3887                MasterService.Interface::isSnapshotCleanupEnabled,
3888                IsSnapshotCleanupEnabledResponse::getEnabled))
3889        .call();
3890  }
3891
3892}