001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.ClusterMetrics;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.ClusterMetricsBuilder;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.MetaTableAccessor;
058import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
059import org.apache.hadoop.hbase.NamespaceDescriptor;
060import org.apache.hadoop.hbase.RegionLocations;
061import org.apache.hadoop.hbase.RegionMetrics;
062import org.apache.hadoop.hbase.RegionMetricsBuilder;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableExistsException;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.TableNotDisabledException;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.UnknownRegionException;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.Scan.ReadType;
074import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
075import org.apache.hadoop.hbase.client.replication.TableCFs;
076import org.apache.hadoop.hbase.client.security.SecurityCapability;
077import org.apache.hadoop.hbase.exceptions.DeserializationException;
078import org.apache.hadoop.hbase.ipc.HBaseRpcController;
079import org.apache.hadoop.hbase.net.Address;
080import org.apache.hadoop.hbase.quotas.QuotaFilter;
081import org.apache.hadoop.hbase.quotas.QuotaSettings;
082import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
083import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
084import org.apache.hadoop.hbase.replication.ReplicationException;
085import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
086import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
087import org.apache.hadoop.hbase.replication.SyncReplicationState;
088import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
090import org.apache.hadoop.hbase.security.access.Permission;
091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
092import org.apache.hadoop.hbase.security.access.UserPermission;
093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.util.Bytes;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
099import org.apache.hadoop.hbase.util.Pair;
100import org.apache.yetus.audience.InterfaceAudience;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
105import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
106import org.apache.hbase.thirdparty.com.google.protobuf.Message;
107import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
108import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
109import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
110import org.apache.hbase.thirdparty.io.netty.util.Timeout;
111import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
112import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
113
114import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
115import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
331
332/**
333 * The implementation of AsyncAdmin.
334 * <p>
335 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
336 * be finished inside the rpc framework thread, which means that the callbacks registered to the
337 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
338 * this class should not try to do time consuming tasks in the callbacks.
339 * @since 2.0.0
340 * @see AsyncHBaseAdmin
341 * @see AsyncConnection#getAdmin()
342 * @see AsyncConnection#getAdminBuilder()
343 */
344@InterfaceAudience.Private
345class RawAsyncHBaseAdmin implements AsyncAdmin {
346
347  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
348
349  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
350
351  private final AsyncConnectionImpl connection;
352
353  private final HashedWheelTimer retryTimer;
354
355  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
356
357  private final long rpcTimeoutNs;
358
359  private final long operationTimeoutNs;
360
361  private final long pauseNs;
362
363  private final long pauseForCQTBENs;
364
365  private final int maxAttempts;
366
367  private final int startLogErrorsCnt;
368
369  private final NonceGenerator ng;
370
371  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
372      AsyncAdminBuilderBase builder) {
373    this.connection = connection;
374    this.retryTimer = retryTimer;
375    this.metaTable = connection.getTable(META_TABLE_NAME);
376    this.rpcTimeoutNs = builder.rpcTimeoutNs;
377    this.operationTimeoutNs = builder.operationTimeoutNs;
378    this.pauseNs = builder.pauseNs;
379    if (builder.pauseForCQTBENs < builder.pauseNs) {
380      LOG.warn(
381        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
382          " the normal pause value {} ms, use the greater one instead",
383        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
384        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
385      this.pauseForCQTBENs = builder.pauseNs;
386    } else {
387      this.pauseForCQTBENs = builder.pauseForCQTBENs;
388    }
389    this.maxAttempts = builder.maxAttempts;
390    this.startLogErrorsCnt = builder.startLogErrorsCnt;
391    this.ng = connection.getNonceGenerator();
392  }
393
394  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
395    return this.connection.callerFactory.<T> masterRequest()
396      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
397      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
398      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
399      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
400  }
401
402  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
403    return this.connection.callerFactory.<T> adminRequest()
404      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
405      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
406      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
407      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
408  }
409
410  @FunctionalInterface
411  private interface MasterRpcCall<RESP, REQ> {
412    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
413        RpcCallback<RESP> done);
414  }
415
416  @FunctionalInterface
417  private interface AdminRpcCall<RESP, REQ> {
418    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
419        RpcCallback<RESP> done);
420  }
421
422  @FunctionalInterface
423  private interface Converter<D, S> {
424    D convert(S src) throws IOException;
425  }
426
427  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
428      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
429      Converter<RESP, PRESP> respConverter) {
430    CompletableFuture<RESP> future = new CompletableFuture<>();
431    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
432
433      @Override
434      public void run(PRESP resp) {
435        if (controller.failed()) {
436          future.completeExceptionally(controller.getFailed());
437        } else {
438          try {
439            future.complete(respConverter.convert(resp));
440          } catch (IOException e) {
441            future.completeExceptionally(e);
442          }
443        }
444      }
445    });
446    return future;
447  }
448
449  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
450      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
451      Converter<RESP, PRESP> respConverter) {
452    CompletableFuture<RESP> future = new CompletableFuture<>();
453    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
454
455      @Override
456      public void run(PRESP resp) {
457        if (controller.failed()) {
458          future.completeExceptionally(controller.getFailed());
459        } else {
460          try {
461            future.complete(respConverter.convert(resp));
462          } catch (IOException e) {
463            future.completeExceptionally(e);
464          }
465        }
466      }
467    });
468    return future;
469  }
470
471  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
472      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
473      ProcedureBiConsumer consumer) {
474    return procedureCall(b -> {
475    }, preq, rpcCall, respConverter, consumer);
476  }
477
478  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
479      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
480      ProcedureBiConsumer consumer) {
481    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
482  }
483
484  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
485      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
486      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
487      ProcedureBiConsumer consumer) {
488    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
489        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
490    prioritySetter.accept(builder);
491    CompletableFuture<Long> procFuture = builder.call();
492    CompletableFuture<Void> future = waitProcedureResult(procFuture);
493    addListener(future, consumer);
494    return future;
495  }
496
497  @FunctionalInterface
498  private interface TableOperator {
499    CompletableFuture<Void> operate(TableName table);
500  }
501
502  @Override
503  public CompletableFuture<Boolean> tableExists(TableName tableName) {
504    if (TableName.isMetaTableName(tableName)) {
505      return CompletableFuture.completedFuture(true);
506    }
507    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
508  }
509
510  @Override
511  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
512    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
513      includeSysTables));
514  }
515
516  /**
517   * {@link #listTableDescriptors(boolean)}
518   */
519  @Override
520  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
521      boolean includeSysTables) {
522    Preconditions.checkNotNull(pattern,
523      "pattern is null. If you don't specify a pattern, "
524          + "use listTableDescriptors(boolean) instead");
525    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
526      includeSysTables));
527  }
528
529  @Override
530  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
531    Preconditions.checkNotNull(tableNames,
532      "tableNames is null. If you don't specify tableNames, "
533          + "use listTableDescriptors(boolean) instead");
534    if (tableNames.isEmpty()) {
535      return CompletableFuture.completedFuture(Collections.emptyList());
536    }
537    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
538  }
539
540  private CompletableFuture<List<TableDescriptor>>
541      getTableDescriptors(GetTableDescriptorsRequest request) {
542    return this.<List<TableDescriptor>> newMasterCaller()
543        .action((controller, stub) -> this
544            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
545              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
546              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
547        .call();
548  }
549
550  @Override
551  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
552    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
553  }
554
555  @Override
556  public CompletableFuture<List<TableName>>
557      listTableNames(Pattern pattern, boolean includeSysTables) {
558    Preconditions.checkNotNull(pattern,
559        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
560    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
561  }
562
563  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
564    return this
565        .<List<TableName>> newMasterCaller()
566        .action(
567          (controller, stub) -> this
568              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
569                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
570                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
571  }
572
573  @Override
574  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
575    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
576        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
577        List<TableDescriptor>> call(
578          controller, stub,
579          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
580          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
581          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
582        .call();
583  }
584
585  @Override
586  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
587    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
588        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
589        List<TableName>> call(
590          controller, stub,
591          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
592          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
593          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
594        .call();
595  }
596
597  @Override
598  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
599    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
600    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
601      .action((controller, stub) -> this
602        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
603          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
604          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
605          (resp) -> resp.getTableSchemaList()))
606      .call(), (tableSchemas, error) -> {
607        if (error != null) {
608          future.completeExceptionally(error);
609          return;
610        }
611        if (!tableSchemas.isEmpty()) {
612          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
613        } else {
614          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
615        }
616      });
617    return future;
618  }
619
620  @Override
621  public CompletableFuture<Void> createTable(TableDescriptor desc) {
622    return createTable(desc.getTableName(),
623      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
624  }
625
626  @Override
627  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
628      int numRegions) {
629    try {
630      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
631    } catch (IllegalArgumentException e) {
632      return failedFuture(e);
633    }
634  }
635
636  @Override
637  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
638    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
639        + " use createTable(TableDescriptor) instead");
640    try {
641      verifySplitKeys(splitKeys);
642      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
643        splitKeys, ng.getNonceGroup(), ng.newNonce()));
644    } catch (IllegalArgumentException e) {
645      return failedFuture(e);
646    }
647  }
648
649  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
650    Preconditions.checkNotNull(tableName, "table name is null");
651    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
652      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
653      new CreateTableProcedureBiConsumer(tableName));
654  }
655
656  @Override
657  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
658    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
659      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
660        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
661      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
662  }
663
664  @Override
665  public CompletableFuture<Void> deleteTable(TableName tableName) {
666    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
667      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
668      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
669      new DeleteTableProcedureBiConsumer(tableName));
670  }
671
672  @Override
673  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
674    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
675      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
676        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
677      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
678  }
679
680  @Override
681  public CompletableFuture<Void> enableTable(TableName tableName) {
682    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
683      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
684      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
685      new EnableTableProcedureBiConsumer(tableName));
686  }
687
688  @Override
689  public CompletableFuture<Void> disableTable(TableName tableName) {
690    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
691      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
692      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
693      new DisableTableProcedureBiConsumer(tableName));
694  }
695
696  /**
697   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
698   * using passed parameters. Sets error or boolean result ('true' if table matches
699   * the passed-in targetState).
700   */
701  private static CompletableFuture<Boolean> completeCheckTableState(
702      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
703      TableState.State targetState, TableName tableName) {
704    if (error != null) {
705      future.completeExceptionally(error);
706    } else {
707      if (tableState != null) {
708        future.complete(tableState.inStates(targetState));
709      } else {
710        future.completeExceptionally(new TableNotFoundException(tableName));
711      }
712    }
713    return future;
714  }
715
716  @Override
717  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
718    if (TableName.isMetaTableName(tableName)) {
719      return CompletableFuture.completedFuture(true);
720    }
721    CompletableFuture<Boolean> future = new CompletableFuture<>();
722    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
723      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
724        TableState.State.ENABLED, tableName);
725    });
726    return future;
727  }
728
729  @Override
730  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
731    if (TableName.isMetaTableName(tableName)) {
732      return CompletableFuture.completedFuture(false);
733    }
734    CompletableFuture<Boolean> future = new CompletableFuture<>();
735    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (tableState, error) -> {
736      completeCheckTableState(future, tableState.isPresent()? tableState.get(): null, error,
737        TableState.State.DISABLED, tableName);
738    });
739    return future;
740  }
741
742  @Override
743  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
744    if (TableName.isMetaTableName(tableName)) {
745      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
746        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
747    }
748    CompletableFuture<Boolean> future = new CompletableFuture<>();
749    addListener(isTableEnabled(tableName), (enabled, error) -> {
750      if (error != null) {
751        if (error instanceof TableNotFoundException) {
752          future.complete(false);
753        } else {
754          future.completeExceptionally(error);
755        }
756        return;
757      }
758      if (!enabled) {
759        future.complete(false);
760      } else {
761        addListener(
762          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
763          (locations, error1) -> {
764            if (error1 != null) {
765              future.completeExceptionally(error1);
766              return;
767            }
768            List<HRegionLocation> notDeployedRegions = locations.stream()
769              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
770            if (notDeployedRegions.size() > 0) {
771              if (LOG.isDebugEnabled()) {
772                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
773              }
774              future.complete(false);
775              return;
776            }
777            future.complete(true);
778          });
779      }
780    });
781    return future;
782  }
783
784  @Override
785  public CompletableFuture<Void> addColumnFamily(
786      TableName tableName, ColumnFamilyDescriptor columnFamily) {
787    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
788      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
789        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
790      new AddColumnFamilyProcedureBiConsumer(tableName));
791  }
792
793  @Override
794  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
795    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
796      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
797        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
798      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
799  }
800
801  @Override
802  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
803      ColumnFamilyDescriptor columnFamily) {
804    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
805      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
806        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
807      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
808  }
809
810  @Override
811  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
812    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
813      RequestConverter.buildCreateNamespaceRequest(descriptor),
814      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
815      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
816  }
817
818  @Override
819  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
820    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
821      RequestConverter.buildModifyNamespaceRequest(descriptor),
822      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
823      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
824  }
825
826  @Override
827  public CompletableFuture<Void> deleteNamespace(String name) {
828    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
829      RequestConverter.buildDeleteNamespaceRequest(name),
830      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
831      new DeleteNamespaceProcedureBiConsumer(name));
832  }
833
834  @Override
835  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
836    return this
837        .<NamespaceDescriptor> newMasterCaller()
838        .action(
839          (controller, stub) -> this
840              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
841                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
842                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
843                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
844  }
845
846  @Override
847  public CompletableFuture<List<String>> listNamespaces() {
848    return this
849        .<List<String>> newMasterCaller()
850        .action(
851          (controller, stub) -> this
852              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
853                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
854                  done) -> s.listNamespaces(c, req, done),
855                (resp) -> resp.getNamespaceNameList())).call();
856  }
857
858  @Override
859  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
860    return this
861        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
862              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
863                  List<NamespaceDescriptor>> call(controller, stub,
864                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
865                      s.listNamespaceDescriptors(c, req, done),
866                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
867  }
868
869  @Override
870  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
871    return this.<List<RegionInfo>> newAdminCaller()
872        .action((controller, stub) -> this
873            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
874              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
875              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
876              resp -> ProtobufUtil.getRegionInfos(resp)))
877        .serverName(serverName).call();
878  }
879
880  @Override
881  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
882    if (tableName.equals(META_TABLE_NAME)) {
883      return connection.registry.getMetaRegionLocations()
884        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
885          .collect(Collectors.toList()));
886    } else {
887      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
888        .thenApply(
889          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
890    }
891  }
892
893  @Override
894  public CompletableFuture<Void> flush(TableName tableName) {
895    CompletableFuture<Void> future = new CompletableFuture<>();
896    addListener(tableExists(tableName), (exists, err) -> {
897      if (err != null) {
898        future.completeExceptionally(err);
899      } else if (!exists) {
900        future.completeExceptionally(new TableNotFoundException(tableName));
901      } else {
902        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
903          if (err2 != null) {
904            future.completeExceptionally(err2);
905          } else if (!tableEnabled) {
906            future.completeExceptionally(new TableNotEnabledException(tableName));
907          } else {
908            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
909              new HashMap<>()), (ret, err3) -> {
910                if (err3 != null) {
911                  future.completeExceptionally(err3);
912                } else {
913                  future.complete(ret);
914                }
915              });
916          }
917        });
918      }
919    });
920    return future;
921  }
922
923  @Override
924  public CompletableFuture<Void> flushRegion(byte[] regionName) {
925    return flushRegionInternal(regionName, false).thenAccept(r -> {
926    });
927  }
928
929  /**
930   * This method is for internal use only, where we need the response of the flush.
931   * <p/>
932   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
933   * API.
934   */
935  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
936      boolean writeFlushWALMarker) {
937    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
938    addListener(getRegionLocation(regionName), (location, err) -> {
939      if (err != null) {
940        future.completeExceptionally(err);
941        return;
942      }
943      ServerName serverName = location.getServerName();
944      if (serverName == null) {
945        future
946          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
947        return;
948      }
949      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
950        if (err2 != null) {
951          future.completeExceptionally(err2);
952        } else {
953          future.complete(ret);
954        }
955      });
956    });
957    return future;
958  }
959
960  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
961      boolean writeFlushWALMarker) {
962    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
963      .action((controller, stub) -> this
964        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
965          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
966          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
967      .call();
968  }
969
970  @Override
971  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
972    CompletableFuture<Void> future = new CompletableFuture<>();
973    addListener(getRegions(sn), (hRegionInfos, err) -> {
974      if (err != null) {
975        future.completeExceptionally(err);
976        return;
977      }
978      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
979      if (hRegionInfos != null) {
980        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
981        })));
982      }
983      addListener(CompletableFuture.allOf(
984        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
985          if (err2 != null) {
986            future.completeExceptionally(err2);
987          } else {
988            future.complete(ret);
989          }
990        });
991    });
992    return future;
993  }
994
995  @Override
996  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
997    return compact(tableName, null, false, compactType);
998  }
999
1000  @Override
1001  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1002      CompactType compactType) {
1003    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1004        + "If you don't specify a columnFamily, use compact(TableName) instead");
1005    return compact(tableName, columnFamily, false, compactType);
1006  }
1007
1008  @Override
1009  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1010    return compactRegion(regionName, null, false);
1011  }
1012
1013  @Override
1014  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1015    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1016        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1017    return compactRegion(regionName, columnFamily, false);
1018  }
1019
1020  @Override
1021  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1022    return compact(tableName, null, true, compactType);
1023  }
1024
1025  @Override
1026  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1027      CompactType compactType) {
1028    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1029        + "If you don't specify a columnFamily, use compact(TableName) instead");
1030    return compact(tableName, columnFamily, true, compactType);
1031  }
1032
1033  @Override
1034  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1035    return compactRegion(regionName, null, true);
1036  }
1037
1038  @Override
1039  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1040    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1041        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1042    return compactRegion(regionName, columnFamily, true);
1043  }
1044
1045  @Override
1046  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1047    return compactRegionServer(sn, false);
1048  }
1049
1050  @Override
1051  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1052    return compactRegionServer(sn, true);
1053  }
1054
1055  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1056    CompletableFuture<Void> future = new CompletableFuture<>();
1057    addListener(getRegions(sn), (hRegionInfos, err) -> {
1058      if (err != null) {
1059        future.completeExceptionally(err);
1060        return;
1061      }
1062      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1063      if (hRegionInfos != null) {
1064        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1065      }
1066      addListener(CompletableFuture.allOf(
1067        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1068          if (err2 != null) {
1069            future.completeExceptionally(err2);
1070          } else {
1071            future.complete(ret);
1072          }
1073        });
1074    });
1075    return future;
1076  }
1077
1078  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1079      boolean major) {
1080    CompletableFuture<Void> future = new CompletableFuture<>();
1081    addListener(getRegionLocation(regionName), (location, err) -> {
1082      if (err != null) {
1083        future.completeExceptionally(err);
1084        return;
1085      }
1086      ServerName serverName = location.getServerName();
1087      if (serverName == null) {
1088        future
1089          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1090        return;
1091      }
1092      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1093        (ret, err2) -> {
1094          if (err2 != null) {
1095            future.completeExceptionally(err2);
1096          } else {
1097            future.complete(ret);
1098          }
1099        });
1100    });
1101    return future;
1102  }
1103
1104  /**
1105   * List all region locations for the specific table.
1106   */
1107  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1108    if (TableName.META_TABLE_NAME.equals(tableName)) {
1109      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1110      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1111        if (err != null) {
1112          future.completeExceptionally(err);
1113        } else if (metaRegions == null || metaRegions.isEmpty() ||
1114          metaRegions.getDefaultRegionLocation() == null) {
1115          future.completeExceptionally(new IOException("meta region does not found"));
1116        } else {
1117          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1118        }
1119      });
1120      return future;
1121    } else {
1122      // For non-meta table, we fetch all locations by scanning hbase:meta table
1123      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1124    }
1125  }
1126
1127  /**
1128   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1129   */
1130  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1131      CompactType compactType) {
1132    CompletableFuture<Void> future = new CompletableFuture<>();
1133
1134    switch (compactType) {
1135      case MOB:
1136        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1137          if (err != null) {
1138            future.completeExceptionally(err);
1139            return;
1140          }
1141          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1142          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1143            if (err2 != null) {
1144              future.completeExceptionally(err2);
1145            } else {
1146              future.complete(ret);
1147            }
1148          });
1149        });
1150        break;
1151      case NORMAL:
1152        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1153          if (err != null) {
1154            future.completeExceptionally(err);
1155            return;
1156          }
1157          if (locations == null || locations.isEmpty()) {
1158            future.completeExceptionally(new TableNotFoundException(tableName));
1159          }
1160          CompletableFuture<?>[] compactFutures =
1161            locations.stream().filter(l -> l.getRegion() != null)
1162              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1163              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1164              .toArray(CompletableFuture<?>[]::new);
1165          // future complete unless all of the compact futures are completed.
1166          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1167            if (err2 != null) {
1168              future.completeExceptionally(err2);
1169            } else {
1170              future.complete(ret);
1171            }
1172          });
1173        });
1174        break;
1175      default:
1176        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1177    }
1178    return future;
1179  }
1180
1181  /**
1182   * Compact the region at specific region server.
1183   */
1184  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1185      final boolean major, byte[] columnFamily) {
1186    return this
1187        .<Void> newAdminCaller()
1188        .serverName(sn)
1189        .action(
1190          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1191            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1192              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1193            resp -> null)).call();
1194  }
1195
1196  private byte[] toEncodeRegionName(byte[] regionName) {
1197    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1198      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1199  }
1200
1201  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1202      CompletableFuture<TableName> result) {
1203    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1204      if (err != null) {
1205        result.completeExceptionally(err);
1206        return;
1207      }
1208      RegionInfo regionInfo = location.getRegion();
1209      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1210        result.completeExceptionally(
1211          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1212        return;
1213      }
1214      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1215        if (!tableName.get().equals(regionInfo.getTable())) {
1216          // tables of this two region should be same.
1217          result.completeExceptionally(
1218            new IllegalArgumentException("Cannot merge regions from two different tables " +
1219              tableName.get() + " and " + regionInfo.getTable()));
1220        } else {
1221          result.complete(tableName.get());
1222        }
1223      }
1224    });
1225  }
1226
1227  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1228    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1229    CompletableFuture<TableName> future = new CompletableFuture<>();
1230    for (byte[] encodedRegionName : encodedRegionNames) {
1231      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1232    }
1233    return future;
1234  }
1235
1236  @Override
1237  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1238    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1239  }
1240
1241  @Override
1242  public CompletableFuture<Boolean> isMergeEnabled() {
1243    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1244  }
1245
1246  @Override
1247  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1248    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1249  }
1250
1251  @Override
1252  public CompletableFuture<Boolean> isSplitEnabled() {
1253    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1254  }
1255
1256  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1257      MasterSwitchType switchType) {
1258    SetSplitOrMergeEnabledRequest request =
1259      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1260    return this.<Boolean> newMasterCaller()
1261      .action((controller, stub) -> this
1262        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1263          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1264          (resp) -> resp.getPrevValueList().get(0)))
1265      .call();
1266  }
1267
1268  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1269    IsSplitOrMergeEnabledRequest request =
1270        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1271    return this
1272        .<Boolean> newMasterCaller()
1273        .action(
1274          (controller, stub) -> this
1275              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1276                controller, stub, request,
1277                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1278                (resp) -> resp.getEnabled())).call();
1279  }
1280
1281  @Override
1282  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1283    if (nameOfRegionsToMerge.size() < 2) {
1284      return failedFuture(new IllegalArgumentException(
1285        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1286    }
1287    CompletableFuture<Void> future = new CompletableFuture<>();
1288    byte[][] encodedNameOfRegionsToMerge =
1289      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1290
1291    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1292      if (err != null) {
1293        future.completeExceptionally(err);
1294        return;
1295      }
1296
1297      MergeTableRegionsRequest request = null;
1298      try {
1299        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1300          forcible, ng.getNonceGroup(), ng.newNonce());
1301      } catch (DeserializationException e) {
1302        future.completeExceptionally(e);
1303        return;
1304      }
1305
1306      addListener(
1307        this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
1308          (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1309          new MergeTableRegionProcedureBiConsumer(tableName)),
1310        (ret, err2) -> {
1311          if (err2 != null) {
1312            future.completeExceptionally(err2);
1313          } else {
1314            future.complete(ret);
1315          }
1316        });
1317    });
1318    return future;
1319  }
1320
1321  @Override
1322  public CompletableFuture<Void> split(TableName tableName) {
1323    CompletableFuture<Void> future = new CompletableFuture<>();
1324    addListener(tableExists(tableName), (exist, error) -> {
1325      if (error != null) {
1326        future.completeExceptionally(error);
1327        return;
1328      }
1329      if (!exist) {
1330        future.completeExceptionally(new TableNotFoundException(tableName));
1331        return;
1332      }
1333      addListener(
1334        metaTable
1335          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1336            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1337            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1338        (results, err2) -> {
1339          if (err2 != null) {
1340            future.completeExceptionally(err2);
1341            return;
1342          }
1343          if (results != null && !results.isEmpty()) {
1344            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1345            for (Result r : results) {
1346              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1347                continue;
1348              }
1349              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1350              if (rl != null) {
1351                for (HRegionLocation h : rl.getRegionLocations()) {
1352                  if (h != null && h.getServerName() != null) {
1353                    RegionInfo hri = h.getRegion();
1354                    if (hri == null || hri.isSplitParent() ||
1355                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1356                      continue;
1357                    }
1358                    splitFutures.add(split(hri, null));
1359                  }
1360                }
1361              }
1362            }
1363            addListener(
1364              CompletableFuture
1365                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1366              (ret, exception) -> {
1367                if (exception != null) {
1368                  future.completeExceptionally(exception);
1369                  return;
1370                }
1371                future.complete(ret);
1372              });
1373          } else {
1374            future.complete(null);
1375          }
1376        });
1377    });
1378    return future;
1379  }
1380
1381  @Override
1382  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1383    CompletableFuture<Void> result = new CompletableFuture<>();
1384    if (splitPoint == null) {
1385      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1386    }
1387    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1388      (loc, err) -> {
1389        if (err != null) {
1390          result.completeExceptionally(err);
1391        } else if (loc == null || loc.getRegion() == null) {
1392          result.completeExceptionally(new IllegalArgumentException(
1393            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1394        } else {
1395          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1396            if (err2 != null) {
1397              result.completeExceptionally(err2);
1398            } else {
1399              result.complete(ret);
1400            }
1401
1402          });
1403        }
1404      });
1405    return result;
1406  }
1407
1408  @Override
1409  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1410    CompletableFuture<Void> future = new CompletableFuture<>();
1411    addListener(getRegionLocation(regionName), (location, err) -> {
1412      if (err != null) {
1413        future.completeExceptionally(err);
1414        return;
1415      }
1416      RegionInfo regionInfo = location.getRegion();
1417      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1418        future
1419          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1420            "Replicas are auto-split when their primary is split."));
1421        return;
1422      }
1423      ServerName serverName = location.getServerName();
1424      if (serverName == null) {
1425        future
1426          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1427        return;
1428      }
1429      addListener(split(regionInfo, null), (ret, err2) -> {
1430        if (err2 != null) {
1431          future.completeExceptionally(err2);
1432        } else {
1433          future.complete(ret);
1434        }
1435      });
1436    });
1437    return future;
1438  }
1439
1440  @Override
1441  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1442    Preconditions.checkNotNull(splitPoint,
1443      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1444    CompletableFuture<Void> future = new CompletableFuture<>();
1445    addListener(getRegionLocation(regionName), (location, err) -> {
1446      if (err != null) {
1447        future.completeExceptionally(err);
1448        return;
1449      }
1450      RegionInfo regionInfo = location.getRegion();
1451      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1452        future
1453          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1454            "Replicas are auto-split when their primary is split."));
1455        return;
1456      }
1457      ServerName serverName = location.getServerName();
1458      if (serverName == null) {
1459        future
1460          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1461        return;
1462      }
1463      if (regionInfo.getStartKey() != null &&
1464        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1465        future.completeExceptionally(
1466          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1467        return;
1468      }
1469      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1470        if (err2 != null) {
1471          future.completeExceptionally(err2);
1472        } else {
1473          future.complete(ret);
1474        }
1475      });
1476    });
1477    return future;
1478  }
1479
1480  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1481    CompletableFuture<Void> future = new CompletableFuture<>();
1482    TableName tableName = hri.getTable();
1483    SplitTableRegionRequest request = null;
1484    try {
1485      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1486        ng.newNonce());
1487    } catch (DeserializationException e) {
1488      future.completeExceptionally(e);
1489      return future;
1490    }
1491
1492    addListener(
1493      this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
1494        request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1495        new SplitTableRegionProcedureBiConsumer(tableName)),
1496      (ret, err2) -> {
1497        if (err2 != null) {
1498          future.completeExceptionally(err2);
1499        } else {
1500          future.complete(ret);
1501        }
1502      });
1503    return future;
1504  }
1505
1506  @Override
1507  public CompletableFuture<Void> assign(byte[] regionName) {
1508    CompletableFuture<Void> future = new CompletableFuture<>();
1509    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1510      if (err != null) {
1511        future.completeExceptionally(err);
1512        return;
1513      }
1514      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1515        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1516          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1517          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1518        .call(), (ret, err2) -> {
1519          if (err2 != null) {
1520            future.completeExceptionally(err2);
1521          } else {
1522            future.complete(ret);
1523          }
1524        });
1525    });
1526    return future;
1527  }
1528
1529  @Override
1530  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1531    CompletableFuture<Void> future = new CompletableFuture<>();
1532    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1533      if (err != null) {
1534        future.completeExceptionally(err);
1535        return;
1536      }
1537      addListener(
1538        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1539          .action(((controller, stub) -> this
1540            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1541              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1542              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1543          .call(),
1544        (ret, err2) -> {
1545          if (err2 != null) {
1546            future.completeExceptionally(err2);
1547          } else {
1548            future.complete(ret);
1549          }
1550        });
1551    });
1552    return future;
1553  }
1554
1555  @Override
1556  public CompletableFuture<Void> offline(byte[] regionName) {
1557    CompletableFuture<Void> future = new CompletableFuture<>();
1558    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1559      if (err != null) {
1560        future.completeExceptionally(err);
1561        return;
1562      }
1563      addListener(
1564        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1565          .action(((controller, stub) -> this
1566            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1567              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1568              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1569          .call(),
1570        (ret, err2) -> {
1571          if (err2 != null) {
1572            future.completeExceptionally(err2);
1573          } else {
1574            future.complete(ret);
1575          }
1576        });
1577    });
1578    return future;
1579  }
1580
1581  @Override
1582  public CompletableFuture<Void> move(byte[] regionName) {
1583    CompletableFuture<Void> future = new CompletableFuture<>();
1584    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1585      if (err != null) {
1586        future.completeExceptionally(err);
1587        return;
1588      }
1589      addListener(
1590        moveRegion(regionInfo,
1591          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1592        (ret, err2) -> {
1593          if (err2 != null) {
1594            future.completeExceptionally(err2);
1595          } else {
1596            future.complete(ret);
1597          }
1598        });
1599    });
1600    return future;
1601  }
1602
1603  @Override
1604  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1605    Preconditions.checkNotNull(destServerName,
1606      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1607    CompletableFuture<Void> future = new CompletableFuture<>();
1608    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1609      if (err != null) {
1610        future.completeExceptionally(err);
1611        return;
1612      }
1613      addListener(
1614        moveRegion(regionInfo, RequestConverter
1615          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1616        (ret, err2) -> {
1617          if (err2 != null) {
1618            future.completeExceptionally(err2);
1619          } else {
1620            future.complete(ret);
1621          }
1622        });
1623    });
1624    return future;
1625  }
1626
1627  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1628    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1629      .action(
1630        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1631          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1632      .call();
1633  }
1634
1635  @Override
1636  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1637    return this
1638        .<Void> newMasterCaller()
1639        .action(
1640          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1641            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1642            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1643  }
1644
1645  @Override
1646  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1647    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1648    Scan scan = QuotaTableUtil.makeScan(filter);
1649    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1650        .scan(scan, new AdvancedScanResultConsumer() {
1651          List<QuotaSettings> settings = new ArrayList<>();
1652
1653          @Override
1654          public void onNext(Result[] results, ScanController controller) {
1655            for (Result result : results) {
1656              try {
1657                QuotaTableUtil.parseResultToCollection(result, settings);
1658              } catch (IOException e) {
1659                controller.terminate();
1660                future.completeExceptionally(e);
1661              }
1662            }
1663          }
1664
1665          @Override
1666          public void onError(Throwable error) {
1667            future.completeExceptionally(error);
1668          }
1669
1670          @Override
1671          public void onComplete() {
1672            future.complete(settings);
1673          }
1674        });
1675    return future;
1676  }
1677
1678  @Override
1679  public CompletableFuture<Void> addReplicationPeer(String peerId,
1680      ReplicationPeerConfig peerConfig, boolean enabled) {
1681    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1682      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1683      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1684      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1685  }
1686
1687  @Override
1688  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1689    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1690      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1691      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1692      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1693  }
1694
1695  @Override
1696  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1697    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1698      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1699      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1700      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1701  }
1702
1703  @Override
1704  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1705    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1706      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1707      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1708      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1709  }
1710
1711  @Override
1712  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1713    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1714      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
1715          call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1716            (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1717            (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1718  }
1719
1720  @Override
1721  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1722      ReplicationPeerConfig peerConfig) {
1723    return this
1724      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1725        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1726        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1727        (resp) -> resp.getProcId(),
1728        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1729  }
1730
1731  @Override
1732  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1733      SyncReplicationState clusterState) {
1734    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1735        TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1736        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1737          clusterState),
1738          (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1739          (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1740            () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1741  }
1742
1743  @Override
1744  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1745      Map<TableName, List<String>> tableCfs) {
1746    if (tableCfs == null) {
1747      return failedFuture(new ReplicationException("tableCfs is null"));
1748    }
1749
1750    CompletableFuture<Void> future = new CompletableFuture<Void>();
1751    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1752      if (!completeExceptionally(future, error)) {
1753        ReplicationPeerConfig newPeerConfig =
1754          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1755        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1756          if (!completeExceptionally(future, error)) {
1757            future.complete(result);
1758          }
1759        });
1760      }
1761    });
1762    return future;
1763  }
1764
1765  @Override
1766  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1767      Map<TableName, List<String>> tableCfs) {
1768    if (tableCfs == null) {
1769      return failedFuture(new ReplicationException("tableCfs is null"));
1770    }
1771
1772    CompletableFuture<Void> future = new CompletableFuture<Void>();
1773    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1774      if (!completeExceptionally(future, error)) {
1775        ReplicationPeerConfig newPeerConfig = null;
1776        try {
1777          newPeerConfig = ReplicationPeerConfigUtil
1778            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1779        } catch (ReplicationException e) {
1780          future.completeExceptionally(e);
1781          return;
1782        }
1783        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1784          if (!completeExceptionally(future, error)) {
1785            future.complete(result);
1786          }
1787        });
1788      }
1789    });
1790    return future;
1791  }
1792
1793  @Override
1794  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1795    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1796  }
1797
1798  @Override
1799  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1800    Preconditions.checkNotNull(pattern,
1801      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1802    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1803  }
1804
1805  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1806      ListReplicationPeersRequest request) {
1807    return this
1808        .<List<ReplicationPeerDescription>> newMasterCaller()
1809        .action(
1810          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1811              List<ReplicationPeerDescription>> call(controller, stub, request,
1812                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1813                (resp) -> resp.getPeerDescList().stream()
1814                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1815                    .collect(Collectors.toList()))).call();
1816  }
1817
1818  @Override
1819  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1820    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1821    addListener(listTableDescriptors(), (tables, error) -> {
1822      if (!completeExceptionally(future, error)) {
1823        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1824        tables.forEach(table -> {
1825          Map<String, Integer> cfs = new HashMap<>();
1826          Stream.of(table.getColumnFamilies())
1827            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1828            .forEach(column -> {
1829              cfs.put(column.getNameAsString(), column.getScope());
1830            });
1831          if (!cfs.isEmpty()) {
1832            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1833          }
1834        });
1835        future.complete(replicatedTableCFs);
1836      }
1837    });
1838    return future;
1839  }
1840
1841  @Override
1842  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1843    SnapshotProtos.SnapshotDescription snapshot =
1844      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1845    try {
1846      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1847    } catch (IllegalArgumentException e) {
1848      return failedFuture(e);
1849    }
1850    CompletableFuture<Void> future = new CompletableFuture<>();
1851    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1852    addListener(this.<Long> newMasterCaller()
1853      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1854        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1855        resp -> resp.getExpectedTimeout()))
1856      .call(), (expectedTimeout, err) -> {
1857        if (err != null) {
1858          future.completeExceptionally(err);
1859          return;
1860        }
1861        TimerTask pollingTask = new TimerTask() {
1862          int tries = 0;
1863          long startTime = EnvironmentEdgeManager.currentTime();
1864          long endTime = startTime + expectedTimeout;
1865          long maxPauseTime = expectedTimeout / maxAttempts;
1866
1867          @Override
1868          public void run(Timeout timeout) throws Exception {
1869            if (EnvironmentEdgeManager.currentTime() < endTime) {
1870              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1871                if (err2 != null) {
1872                  future.completeExceptionally(err2);
1873                } else if (done) {
1874                  future.complete(null);
1875                } else {
1876                  // retry again after pauseTime.
1877                  long pauseTime =
1878                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1879                  pauseTime = Math.min(pauseTime, maxPauseTime);
1880                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1881                    TimeUnit.MILLISECONDS);
1882                }
1883              });
1884            } else {
1885              future.completeExceptionally(
1886                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1887                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1888            }
1889          }
1890        };
1891        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1892      });
1893    return future;
1894  }
1895
1896  @Override
1897  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1898    return this
1899        .<Boolean> newMasterCaller()
1900        .action(
1901          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1902            controller,
1903            stub,
1904            IsSnapshotDoneRequest.newBuilder()
1905                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1906                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1907  }
1908
1909  @Override
1910  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1911    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1912      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1913      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1914    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1915  }
1916
1917  @Override
1918  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1919      boolean restoreAcl) {
1920    CompletableFuture<Void> future = new CompletableFuture<>();
1921    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1922      if (err != null) {
1923        future.completeExceptionally(err);
1924        return;
1925      }
1926      TableName tableName = null;
1927      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1928        for (SnapshotDescription snap : snapshotDescriptions) {
1929          if (snap.getName().equals(snapshotName)) {
1930            tableName = snap.getTableName();
1931            break;
1932          }
1933        }
1934      }
1935      if (tableName == null) {
1936        future.completeExceptionally(new RestoreSnapshotException(
1937          "Unable to find the table name for snapshot=" + snapshotName));
1938        return;
1939      }
1940      final TableName finalTableName = tableName;
1941      addListener(tableExists(finalTableName), (exists, err2) -> {
1942        if (err2 != null) {
1943          future.completeExceptionally(err2);
1944        } else if (!exists) {
1945          // if table does not exist, then just clone snapshot into new table.
1946          completeConditionalOnFuture(future,
1947            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1948        } else {
1949          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1950            if (err4 != null) {
1951              future.completeExceptionally(err4);
1952            } else if (!disabled) {
1953              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1954            } else {
1955              completeConditionalOnFuture(future,
1956                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1957            }
1958          });
1959        }
1960      });
1961    });
1962    return future;
1963  }
1964
1965  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1966      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1967    if (takeFailSafeSnapshot) {
1968      CompletableFuture<Void> future = new CompletableFuture<>();
1969      // Step.1 Take a snapshot of the current state
1970      String failSafeSnapshotSnapshotNameFormat =
1971        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1972          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1973      final String failSafeSnapshotSnapshotName =
1974        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1975          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1976          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1977      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1978      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1979        if (err != null) {
1980          future.completeExceptionally(err);
1981        } else {
1982          // Step.2 Restore snapshot
1983          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1984            (void2, err2) -> {
1985              if (err2 != null) {
1986                // Step.3.a Something went wrong during the restore and try to rollback.
1987                addListener(
1988                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1989                  (void3, err3) -> {
1990                    if (err3 != null) {
1991                      future.completeExceptionally(err3);
1992                    } else {
1993                      String msg =
1994                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1995                          failSafeSnapshotSnapshotName + " succeeded.";
1996                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1997                    }
1998                  });
1999              } else {
2000                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2001                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2002                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2003                  if (err3 != null) {
2004                    LOG.error(
2005                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2006                      err3);
2007                  }
2008                  future.complete(ret3);
2009                });
2010              }
2011            });
2012        }
2013      });
2014      return future;
2015    } else {
2016      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
2017    }
2018  }
2019
2020  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2021      CompletableFuture<T> parentFuture) {
2022    addListener(parentFuture, (res, err) -> {
2023      if (err != null) {
2024        dependentFuture.completeExceptionally(err);
2025      } else {
2026        dependentFuture.complete(res);
2027      }
2028    });
2029  }
2030
2031  @Override
2032  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2033      boolean restoreAcl) {
2034    CompletableFuture<Void> future = new CompletableFuture<>();
2035    addListener(tableExists(tableName), (exists, err) -> {
2036      if (err != null) {
2037        future.completeExceptionally(err);
2038      } else if (exists) {
2039        future.completeExceptionally(new TableExistsException(tableName));
2040      } else {
2041        completeConditionalOnFuture(future,
2042          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2043      }
2044    });
2045    return future;
2046  }
2047
2048  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2049      boolean restoreAcl) {
2050    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2051      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2052    try {
2053      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2054    } catch (IllegalArgumentException e) {
2055      return failedFuture(e);
2056    }
2057    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2058      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2059        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2060          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2061        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2062      .call());
2063  }
2064
2065  @Override
2066  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2067    return getCompletedSnapshots(null);
2068  }
2069
2070  @Override
2071  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2072    Preconditions.checkNotNull(pattern,
2073      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2074    return getCompletedSnapshots(pattern);
2075  }
2076
2077  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2078    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2079        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2080        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2081          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2082          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2083        .call();
2084  }
2085
2086  @Override
2087  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2088    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2089        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2090    return getCompletedSnapshots(tableNamePattern, null);
2091  }
2092
2093  @Override
2094  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2095      Pattern snapshotNamePattern) {
2096    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2097        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2098    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2099        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2100    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2101  }
2102
2103  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2104      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2105    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2106    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2107      if (err != null) {
2108        future.completeExceptionally(err);
2109        return;
2110      }
2111      if (tableNames == null || tableNames.size() <= 0) {
2112        future.complete(Collections.emptyList());
2113        return;
2114      }
2115      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2116        if (err2 != null) {
2117          future.completeExceptionally(err2);
2118          return;
2119        }
2120        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2121          future.complete(Collections.emptyList());
2122          return;
2123        }
2124        future.complete(snapshotDescList.stream()
2125          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2126          .collect(Collectors.toList()));
2127      });
2128    });
2129    return future;
2130  }
2131
2132  @Override
2133  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2134    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2135  }
2136
2137  @Override
2138  public CompletableFuture<Void> deleteSnapshots() {
2139    return internalDeleteSnapshots(null, null);
2140  }
2141
2142  @Override
2143  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2144    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2145        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2146    return internalDeleteSnapshots(null, snapshotNamePattern);
2147  }
2148
2149  @Override
2150  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2151    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2152        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2153    return internalDeleteSnapshots(tableNamePattern, null);
2154  }
2155
2156  @Override
2157  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2158      Pattern snapshotNamePattern) {
2159    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2160        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2161    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2162        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2163    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2164  }
2165
2166  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2167      Pattern snapshotNamePattern) {
2168    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2169    if (tableNamePattern == null) {
2170      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2171    } else {
2172      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2173    }
2174    CompletableFuture<Void> future = new CompletableFuture<>();
2175    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2176      if (err != null) {
2177        future.completeExceptionally(err);
2178        return;
2179      }
2180      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2181        future.complete(null);
2182        return;
2183      }
2184      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2185        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2186          if (e != null) {
2187            future.completeExceptionally(e);
2188          } else {
2189            future.complete(v);
2190          }
2191        });
2192    }));
2193    return future;
2194  }
2195
2196  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2197    return this
2198        .<Void> newMasterCaller()
2199        .action(
2200          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2201            controller,
2202            stub,
2203            DeleteSnapshotRequest.newBuilder()
2204                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2205                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2206  }
2207
2208  @Override
2209  public CompletableFuture<Void> execProcedure(String signature, String instance,
2210      Map<String, String> props) {
2211    CompletableFuture<Void> future = new CompletableFuture<>();
2212    ProcedureDescription procDesc =
2213      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2214    addListener(this.<Long> newMasterCaller()
2215      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2216        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2217        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2218      .call(), (expectedTimeout, err) -> {
2219        if (err != null) {
2220          future.completeExceptionally(err);
2221          return;
2222        }
2223        TimerTask pollingTask = new TimerTask() {
2224          int tries = 0;
2225          long startTime = EnvironmentEdgeManager.currentTime();
2226          long endTime = startTime + expectedTimeout;
2227          long maxPauseTime = expectedTimeout / maxAttempts;
2228
2229          @Override
2230          public void run(Timeout timeout) throws Exception {
2231            if (EnvironmentEdgeManager.currentTime() < endTime) {
2232              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2233                if (err2 != null) {
2234                  future.completeExceptionally(err2);
2235                  return;
2236                }
2237                if (done) {
2238                  future.complete(null);
2239                } else {
2240                  // retry again after pauseTime.
2241                  long pauseTime =
2242                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2243                  pauseTime = Math.min(pauseTime, maxPauseTime);
2244                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2245                    TimeUnit.MICROSECONDS);
2246                }
2247              });
2248            } else {
2249              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2250                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2251            }
2252          }
2253        };
2254        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2255        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2256      });
2257    return future;
2258  }
2259
2260  @Override
2261  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2262      Map<String, String> props) {
2263    ProcedureDescription proDesc =
2264        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2265    return this.<byte[]> newMasterCaller()
2266        .action(
2267          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2268            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2269            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2270            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2271        .call();
2272  }
2273
2274  @Override
2275  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2276      Map<String, String> props) {
2277    ProcedureDescription proDesc =
2278        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2279    return this.<Boolean> newMasterCaller()
2280        .action((controller, stub) -> this
2281            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2282              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2283              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2284        .call();
2285  }
2286
2287  @Override
2288  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2289    return this.<Boolean> newMasterCaller().action(
2290      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2291        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2292        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2293        .call();
2294  }
2295
2296  @Override
2297  public CompletableFuture<String> getProcedures() {
2298    return this
2299        .<String> newMasterCaller()
2300        .action(
2301          (controller, stub) -> this
2302              .<GetProceduresRequest, GetProceduresResponse, String> call(
2303                controller, stub, GetProceduresRequest.newBuilder().build(),
2304                (s, c, req, done) -> s.getProcedures(c, req, done),
2305                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2306  }
2307
2308  @Override
2309  public CompletableFuture<String> getLocks() {
2310    return this
2311        .<String> newMasterCaller()
2312        .action(
2313          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2314            controller, stub, GetLocksRequest.newBuilder().build(),
2315            (s, c, req, done) -> s.getLocks(c, req, done),
2316            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2317  }
2318
2319  @Override
2320  public CompletableFuture<Void> decommissionRegionServers(
2321      List<ServerName> servers, boolean offload) {
2322    return this.<Void> newMasterCaller()
2323        .action((controller, stub) -> this
2324          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2325            controller, stub,
2326              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2327            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2328        .call();
2329  }
2330
2331  @Override
2332  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2333    return this.<List<ServerName>> newMasterCaller()
2334        .action((controller, stub) -> this
2335          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2336            List<ServerName>> call(
2337              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2338              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2339              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2340                  .collect(Collectors.toList())))
2341        .call();
2342  }
2343
2344  @Override
2345  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2346      List<byte[]> encodedRegionNames) {
2347    return this.<Void> newMasterCaller()
2348        .action((controller, stub) ->
2349            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2350                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2351                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2352                        c, req, done), resp -> null)).call();
2353  }
2354
2355  /**
2356   * Get the region location for the passed region name. The region name may be a full region name
2357   * or encoded region name. If the region does not found, then it'll throw an
2358   * UnknownRegionException wrapped by a {@link CompletableFuture}
2359   * @param regionNameOrEncodedRegionName region name or encoded region name
2360   * @return region location, wrapped by a {@link CompletableFuture}
2361   */
2362  @VisibleForTesting
2363  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2364    if (regionNameOrEncodedRegionName == null) {
2365      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2366    }
2367    try {
2368      CompletableFuture<Optional<HRegionLocation>> future;
2369      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2370        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2371        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2372          // old format encodedName, should be meta region
2373          future = connection.registry.getMetaRegionLocations()
2374            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2375              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2376        } else {
2377          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2378            regionNameOrEncodedRegionName);
2379        }
2380      } else {
2381        RegionInfo regionInfo =
2382          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2383        if (regionInfo.isMetaRegion()) {
2384          future = connection.registry.getMetaRegionLocations()
2385            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2386              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2387              .findFirst());
2388        } else {
2389          future =
2390            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2391        }
2392      }
2393
2394      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2395      addListener(future, (location, err) -> {
2396        if (err != null) {
2397          returnedFuture.completeExceptionally(err);
2398          return;
2399        }
2400        if (!location.isPresent() || location.get().getRegion() == null) {
2401          returnedFuture.completeExceptionally(
2402            new UnknownRegionException("Invalid region name or encoded region name: " +
2403              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2404        } else {
2405          returnedFuture.complete(location.get());
2406        }
2407      });
2408      return returnedFuture;
2409    } catch (IOException e) {
2410      return failedFuture(e);
2411    }
2412  }
2413
2414  /**
2415   * Get the region info for the passed region name. The region name may be a full region name or
2416   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2417   * wrapped by a {@link CompletableFuture}
2418   * @return region info, wrapped by a {@link CompletableFuture}
2419   */
2420  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2421    if (regionNameOrEncodedRegionName == null) {
2422      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2423    }
2424
2425    if (Bytes.equals(regionNameOrEncodedRegionName,
2426      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2427      Bytes.equals(regionNameOrEncodedRegionName,
2428        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2429      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2430    }
2431
2432    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2433    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2434      if (err != null) {
2435        future.completeExceptionally(err);
2436      } else {
2437        future.complete(location.getRegion());
2438      }
2439    });
2440    return future;
2441  }
2442
2443  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2444    if (numRegions < 3) {
2445      throw new IllegalArgumentException("Must create at least three regions");
2446    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2447      throw new IllegalArgumentException("Start key must be smaller than end key");
2448    }
2449    if (numRegions == 3) {
2450      return new byte[][] { startKey, endKey };
2451    }
2452    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2453    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2454      throw new IllegalArgumentException("Unable to split key range into enough regions");
2455    }
2456    return splitKeys;
2457  }
2458
2459  private void verifySplitKeys(byte[][] splitKeys) {
2460    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2461    // Verify there are no duplicate split keys
2462    byte[] lastKey = null;
2463    for (byte[] splitKey : splitKeys) {
2464      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2465        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2466      }
2467      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2468        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2469            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2470      }
2471      lastKey = splitKey;
2472    }
2473  }
2474
2475  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2476
2477    abstract void onFinished();
2478
2479    abstract void onError(Throwable error);
2480
2481    @Override
2482    public void accept(Void v, Throwable error) {
2483      if (error != null) {
2484        onError(error);
2485        return;
2486      }
2487      onFinished();
2488    }
2489  }
2490
2491  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2492    protected final TableName tableName;
2493
2494    TableProcedureBiConsumer(TableName tableName) {
2495      this.tableName = tableName;
2496    }
2497
2498    abstract String getOperationType();
2499
2500    String getDescription() {
2501      return "Operation: " + getOperationType() + ", " + "Table Name: "
2502          + tableName.getNameWithNamespaceInclAsString();
2503    }
2504
2505    @Override
2506    void onFinished() {
2507      LOG.info(getDescription() + " completed");
2508    }
2509
2510    @Override
2511    void onError(Throwable error) {
2512      LOG.info(getDescription() + " failed with " + error.getMessage());
2513    }
2514  }
2515
2516  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2517    protected final String namespaceName;
2518
2519    NamespaceProcedureBiConsumer(String namespaceName) {
2520      this.namespaceName = namespaceName;
2521    }
2522
2523    abstract String getOperationType();
2524
2525    String getDescription() {
2526      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2527    }
2528
2529    @Override
2530    void onFinished() {
2531      LOG.info(getDescription() + " completed");
2532    }
2533
2534    @Override
2535    void onError(Throwable error) {
2536      LOG.info(getDescription() + " failed with " + error.getMessage());
2537    }
2538  }
2539
2540  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2541
2542    CreateTableProcedureBiConsumer(TableName tableName) {
2543      super(tableName);
2544    }
2545
2546    @Override
2547    String getOperationType() {
2548      return "CREATE";
2549    }
2550  }
2551
2552  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2553
2554    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2555      super(tableName);
2556    }
2557
2558    @Override
2559    String getOperationType() {
2560      return "ENABLE";
2561    }
2562  }
2563
2564  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2565
2566    DeleteTableProcedureBiConsumer(TableName tableName) {
2567      super(tableName);
2568    }
2569
2570    @Override
2571    String getOperationType() {
2572      return "DELETE";
2573    }
2574
2575    @Override
2576    void onFinished() {
2577      connection.getLocator().clearCache(this.tableName);
2578      super.onFinished();
2579    }
2580  }
2581
2582  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2583
2584    TruncateTableProcedureBiConsumer(TableName tableName) {
2585      super(tableName);
2586    }
2587
2588    @Override
2589    String getOperationType() {
2590      return "TRUNCATE";
2591    }
2592  }
2593
2594  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2595
2596    EnableTableProcedureBiConsumer(TableName tableName) {
2597      super(tableName);
2598    }
2599
2600    @Override
2601    String getOperationType() {
2602      return "ENABLE";
2603    }
2604  }
2605
2606  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2607
2608    DisableTableProcedureBiConsumer(TableName tableName) {
2609      super(tableName);
2610    }
2611
2612    @Override
2613    String getOperationType() {
2614      return "DISABLE";
2615    }
2616  }
2617
2618  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2619
2620    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2621      super(tableName);
2622    }
2623
2624    @Override
2625    String getOperationType() {
2626      return "ADD_COLUMN_FAMILY";
2627    }
2628  }
2629
2630  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2631
2632    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2633      super(tableName);
2634    }
2635
2636    @Override
2637    String getOperationType() {
2638      return "DELETE_COLUMN_FAMILY";
2639    }
2640  }
2641
2642  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2643
2644    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2645      super(tableName);
2646    }
2647
2648    @Override
2649    String getOperationType() {
2650      return "MODIFY_COLUMN_FAMILY";
2651    }
2652  }
2653
2654  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2655
2656    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2657      super(namespaceName);
2658    }
2659
2660    @Override
2661    String getOperationType() {
2662      return "CREATE_NAMESPACE";
2663    }
2664  }
2665
2666  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2667
2668    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2669      super(namespaceName);
2670    }
2671
2672    @Override
2673    String getOperationType() {
2674      return "DELETE_NAMESPACE";
2675    }
2676  }
2677
2678  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2679
2680    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2681      super(namespaceName);
2682    }
2683
2684    @Override
2685    String getOperationType() {
2686      return "MODIFY_NAMESPACE";
2687    }
2688  }
2689
2690  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2691
2692    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2693      super(tableName);
2694    }
2695
2696    @Override
2697    String getOperationType() {
2698      return "MERGE_REGIONS";
2699    }
2700  }
2701
2702  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2703
2704    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2705      super(tableName);
2706    }
2707
2708    @Override
2709    String getOperationType() {
2710      return "SPLIT_REGION";
2711    }
2712  }
2713
2714  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2715    private final String peerId;
2716    private final Supplier<String> getOperation;
2717
2718    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2719      this.peerId = peerId;
2720      this.getOperation = getOperation;
2721    }
2722
2723    String getDescription() {
2724      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2725    }
2726
2727    @Override
2728    void onFinished() {
2729      LOG.info(getDescription() + " completed");
2730    }
2731
2732    @Override
2733    void onError(Throwable error) {
2734      LOG.info(getDescription() + " failed with " + error.getMessage());
2735    }
2736  }
2737
2738  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2739    CompletableFuture<Void> future = new CompletableFuture<>();
2740    addListener(procFuture, (procId, error) -> {
2741      if (error != null) {
2742        future.completeExceptionally(error);
2743        return;
2744      }
2745      getProcedureResult(procId, future, 0);
2746    });
2747    return future;
2748  }
2749
2750  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2751    addListener(
2752      this.<GetProcedureResultResponse> newMasterCaller()
2753        .action((controller, stub) -> this
2754          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2755            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2756            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2757        .call(),
2758      (response, error) -> {
2759        if (error != null) {
2760          LOG.warn("failed to get the procedure result procId={}", procId,
2761            ConnectionUtils.translateException(error));
2762          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2763            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2764          return;
2765        }
2766        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2767          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2768            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2769          return;
2770        }
2771        if (response.hasException()) {
2772          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2773          future.completeExceptionally(ioe);
2774        } else {
2775          future.complete(null);
2776        }
2777      });
2778  }
2779
2780  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2781    CompletableFuture<T> future = new CompletableFuture<>();
2782    future.completeExceptionally(error);
2783    return future;
2784  }
2785
2786  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2787    if (error != null) {
2788      future.completeExceptionally(error);
2789      return true;
2790    }
2791    return false;
2792  }
2793
2794  @Override
2795  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2796    return getClusterMetrics(EnumSet.allOf(Option.class));
2797  }
2798
2799  @Override
2800  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2801    return this
2802        .<ClusterMetrics> newMasterCaller()
2803        .action(
2804          (controller, stub) -> this
2805              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2806                stub, RequestConverter.buildGetClusterStatusRequest(options),
2807                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2808                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2809  }
2810
2811  @Override
2812  public CompletableFuture<Void> shutdown() {
2813    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2814      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2815        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2816        resp -> null))
2817      .call();
2818  }
2819
2820  @Override
2821  public CompletableFuture<Void> stopMaster() {
2822    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2823      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2824        controller, stub, StopMasterRequest.newBuilder().build(),
2825        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2826      .call();
2827  }
2828
2829  @Override
2830  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2831    StopServerRequest request = RequestConverter
2832      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2833    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2834      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2835        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2836        resp -> null))
2837      .serverName(serverName).call();
2838  }
2839
2840  @Override
2841  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2842    return this
2843        .<Void> newAdminCaller()
2844        .action(
2845          (controller, stub) -> this
2846              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2847                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2848                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2849        .serverName(serverName).call();
2850  }
2851
2852  @Override
2853  public CompletableFuture<Void> updateConfiguration() {
2854    CompletableFuture<Void> future = new CompletableFuture<Void>();
2855    addListener(
2856      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2857      (status, err) -> {
2858        if (err != null) {
2859          future.completeExceptionally(err);
2860        } else {
2861          List<CompletableFuture<Void>> futures = new ArrayList<>();
2862          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2863          futures.add(updateConfiguration(status.getMasterName()));
2864          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2865          addListener(
2866            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2867            (result, err2) -> {
2868              if (err2 != null) {
2869                future.completeExceptionally(err2);
2870              } else {
2871                future.complete(result);
2872              }
2873            });
2874        }
2875      });
2876    return future;
2877  }
2878
2879  @Override
2880  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2881    return this
2882        .<Void> newAdminCaller()
2883        .action(
2884          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2885            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2886            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2887        .serverName(serverName).call();
2888  }
2889
2890  @Override
2891  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2892    return this
2893        .<Void> newAdminCaller()
2894        .action(
2895          (controller, stub) -> this
2896              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2897                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2898                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2899        .serverName(serverName).call();
2900  }
2901
2902  @Override
2903  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2904    return this
2905        .<List<SecurityCapability>> newMasterCaller()
2906        .action(
2907          (controller, stub) -> this
2908              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2909                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2910                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2911                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2912        .call();
2913  }
2914
2915  @Override
2916  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2917    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2918  }
2919
2920  @Override
2921  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2922      TableName tableName) {
2923    Preconditions.checkNotNull(tableName,
2924      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2925    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2926  }
2927
2928  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2929      ServerName serverName) {
2930    return this.<List<RegionMetrics>> newAdminCaller()
2931        .action((controller, stub) -> this
2932            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2933              adminCall(controller, stub, request, (s, c, req, done) ->
2934                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2935        .serverName(serverName).call();
2936  }
2937
2938  @Override
2939  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2940    return this
2941        .<Boolean> newMasterCaller()
2942        .action(
2943          (controller, stub) -> this
2944              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2945                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2946                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2947                resp -> resp.getInMaintenanceMode())).call();
2948  }
2949
2950  @Override
2951  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2952      CompactType compactType) {
2953    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2954
2955    switch (compactType) {
2956      case MOB:
2957        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
2958          if (err != null) {
2959            future.completeExceptionally(err);
2960            return;
2961          }
2962          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2963
2964          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2965            .action((controller, stub) -> this
2966              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2967                controller, stub,
2968                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2969                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2970            .call(), (resp2, err2) -> {
2971              if (err2 != null) {
2972                future.completeExceptionally(err2);
2973              } else {
2974                if (resp2.hasCompactionState()) {
2975                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2976                } else {
2977                  future.complete(CompactionState.NONE);
2978                }
2979              }
2980            });
2981        });
2982        break;
2983      case NORMAL:
2984        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2985          if (err != null) {
2986            future.completeExceptionally(err);
2987            return;
2988          }
2989          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2990          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2991          locations.stream().filter(loc -> loc.getServerName() != null)
2992            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2993            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2994              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2995                // If any region compaction state is MAJOR_AND_MINOR
2996                // the table compaction state is MAJOR_AND_MINOR, too.
2997                if (err2 != null) {
2998                  future.completeExceptionally(unwrapCompletionException(err2));
2999                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3000                  future.complete(regionState);
3001                } else {
3002                  regionStates.add(regionState);
3003                }
3004              }));
3005            });
3006          addListener(
3007            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3008            (ret, err3) -> {
3009              // If future not completed, check all regions's compaction state
3010              if (!future.isCompletedExceptionally() && !future.isDone()) {
3011                CompactionState state = CompactionState.NONE;
3012                for (CompactionState regionState : regionStates) {
3013                  switch (regionState) {
3014                    case MAJOR:
3015                      if (state == CompactionState.MINOR) {
3016                        future.complete(CompactionState.MAJOR_AND_MINOR);
3017                      } else {
3018                        state = CompactionState.MAJOR;
3019                      }
3020                      break;
3021                    case MINOR:
3022                      if (state == CompactionState.MAJOR) {
3023                        future.complete(CompactionState.MAJOR_AND_MINOR);
3024                      } else {
3025                        state = CompactionState.MINOR;
3026                      }
3027                      break;
3028                    case NONE:
3029                    default:
3030                  }
3031                }
3032                if (!future.isDone()) {
3033                  future.complete(state);
3034                }
3035              }
3036            });
3037        });
3038        break;
3039      default:
3040        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3041    }
3042
3043    return future;
3044  }
3045
3046  @Override
3047  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3048    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3049    addListener(getRegionLocation(regionName), (location, err) -> {
3050      if (err != null) {
3051        future.completeExceptionally(err);
3052        return;
3053      }
3054      ServerName serverName = location.getServerName();
3055      if (serverName == null) {
3056        future
3057          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3058        return;
3059      }
3060      addListener(
3061        this.<GetRegionInfoResponse> newAdminCaller()
3062          .action((controller, stub) -> this
3063            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3064              controller, stub,
3065              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3066                true),
3067              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3068          .serverName(serverName).call(),
3069        (resp2, err2) -> {
3070          if (err2 != null) {
3071            future.completeExceptionally(err2);
3072          } else {
3073            if (resp2.hasCompactionState()) {
3074              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3075            } else {
3076              future.complete(CompactionState.NONE);
3077            }
3078          }
3079        });
3080    });
3081    return future;
3082  }
3083
3084  @Override
3085  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3086    MajorCompactionTimestampRequest request =
3087        MajorCompactionTimestampRequest.newBuilder()
3088            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3089    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3090        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3091            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3092                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3093  }
3094
3095  @Override
3096  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3097      byte[] regionName) {
3098    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3099    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3100    addListener(getRegionInfo(regionName), (region, err) -> {
3101      if (err != null) {
3102        future.completeExceptionally(err);
3103        return;
3104      }
3105      MajorCompactionTimestampForRegionRequest.Builder builder =
3106        MajorCompactionTimestampForRegionRequest.newBuilder();
3107      builder.setRegion(
3108        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3109      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3110        .<MajorCompactionTimestampForRegionRequest,
3111        MajorCompactionTimestampResponse, Optional<Long>> call(
3112          controller, stub, builder.build(),
3113          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3114          ProtobufUtil::toOptionalTimestamp))
3115        .call(), (timestamp, err2) -> {
3116          if (err2 != null) {
3117            future.completeExceptionally(err2);
3118          } else {
3119            future.complete(timestamp);
3120          }
3121        });
3122    });
3123    return future;
3124  }
3125
3126  @Override
3127  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3128      List<String> serverNamesList) {
3129    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3130    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3131      if (err != null) {
3132        future.completeExceptionally(err);
3133        return;
3134      }
3135      // Accessed by multiple threads.
3136      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3137      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3138      serverNames.stream().forEach(serverName -> {
3139        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3140          if (err2 != null) {
3141            future.completeExceptionally(unwrapCompletionException(err2));
3142          } else {
3143            serverStates.put(serverName, serverState);
3144          }
3145        }));
3146      });
3147      addListener(
3148        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3149        (ret, err3) -> {
3150          if (!future.isCompletedExceptionally()) {
3151            if (err3 != null) {
3152              future.completeExceptionally(err3);
3153            } else {
3154              future.complete(serverStates);
3155            }
3156          }
3157        });
3158    });
3159    return future;
3160  }
3161
3162  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3163    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3164    if (serverNamesList.isEmpty()) {
3165      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3166        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3167      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3168        if (err != null) {
3169          future.completeExceptionally(err);
3170        } else {
3171          future.complete(clusterMetrics.getServersName());
3172        }
3173      });
3174      return future;
3175    } else {
3176      List<ServerName> serverList = new ArrayList<>();
3177      for (String regionServerName : serverNamesList) {
3178        ServerName serverName = null;
3179        try {
3180          serverName = ServerName.valueOf(regionServerName);
3181        } catch (Exception e) {
3182          future.completeExceptionally(
3183            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3184        }
3185        if (serverName == null) {
3186          future.completeExceptionally(
3187            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3188        } else {
3189          serverList.add(serverName);
3190        }
3191      }
3192      future.complete(serverList);
3193    }
3194    return future;
3195  }
3196
3197  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3198    return this
3199        .<Boolean>newAdminCaller()
3200        .serverName(serverName)
3201        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3202            Boolean>adminCall(controller, stub,
3203            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3204            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3205  }
3206
3207  @Override
3208  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3209    return this.<Boolean> newMasterCaller()
3210      .action((controller, stub) -> this
3211        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3212          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3213          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3214          (resp) -> resp.getPrevBalanceValue()))
3215      .call();
3216  }
3217
3218  @Override
3219  public CompletableFuture<Boolean> balance(boolean forcible) {
3220    return this
3221        .<Boolean> newMasterCaller()
3222        .action(
3223          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3224            stub, RequestConverter.buildBalanceRequest(forcible),
3225            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3226  }
3227
3228  @Override
3229  public CompletableFuture<Boolean> isBalancerEnabled() {
3230    return this
3231        .<Boolean> newMasterCaller()
3232        .action((controller, stub) ->
3233              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3234            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3235                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3236  }
3237
3238  @Override
3239  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3240    return this
3241        .<Boolean> newMasterCaller()
3242        .action(
3243          (controller, stub) -> this
3244              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3245                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3246                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3247                    .getPrevNormalizerValue())).call();
3248  }
3249
3250  @Override
3251  public CompletableFuture<Boolean> isNormalizerEnabled() {
3252    return this
3253        .<Boolean> newMasterCaller()
3254        .action(
3255          (controller, stub) -> this
3256              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3257                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3258                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3259                (resp) -> resp.getEnabled())).call();
3260  }
3261
3262  @Override
3263  public CompletableFuture<Boolean> normalize() {
3264    return this
3265        .<Boolean> newMasterCaller()
3266        .action(
3267          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3268            controller, stub, RequestConverter.buildNormalizeRequest(),
3269            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3270        .call();
3271  }
3272
3273  @Override
3274  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3275    return this
3276        .<Boolean> newMasterCaller()
3277        .action(
3278          (controller, stub) -> this
3279              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3280                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3281                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3282                    .getPrevValue())).call();
3283  }
3284
3285  @Override
3286  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3287    return this
3288        .<Boolean> newMasterCaller()
3289        .action(
3290          (controller, stub) -> this
3291              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3292                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3293                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3294        .call();
3295  }
3296
3297  @Override
3298  public CompletableFuture<Boolean> runCleanerChore() {
3299    return this
3300        .<Boolean> newMasterCaller()
3301        .action(
3302          (controller, stub) -> this
3303              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3304                RequestConverter.buildRunCleanerChoreRequest(),
3305                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3306                (resp) -> resp.getCleanerChoreRan())).call();
3307  }
3308
3309  @Override
3310  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3311    return this
3312        .<Boolean> newMasterCaller()
3313        .action(
3314          (controller, stub) -> this
3315              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3316                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3317                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3318                    .getPrevValue())).call();
3319  }
3320
3321  @Override
3322  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3323    return this
3324        .<Boolean> newMasterCaller()
3325        .action(
3326          (controller, stub) -> this
3327              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3328                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3329                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3330                    .getValue())).call();
3331  }
3332
3333  @Override
3334  public CompletableFuture<Integer> runCatalogJanitor() {
3335    return this
3336        .<Integer> newMasterCaller()
3337        .action(
3338          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3339            controller, stub, RequestConverter.buildCatalogScanRequest(),
3340            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3341        .call();
3342  }
3343
3344  @Override
3345  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3346      ServiceCaller<S, R> callable) {
3347    MasterCoprocessorRpcChannelImpl channel =
3348        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3349    S stub = stubMaker.apply(channel);
3350    CompletableFuture<R> future = new CompletableFuture<>();
3351    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3352    callable.call(stub, controller, resp -> {
3353      if (controller.failed()) {
3354        future.completeExceptionally(controller.getFailed());
3355      } else {
3356        future.complete(resp);
3357      }
3358    });
3359    return future;
3360  }
3361
3362  @Override
3363  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3364      ServiceCaller<S, R> callable, ServerName serverName) {
3365    RegionServerCoprocessorRpcChannelImpl channel =
3366        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3367          serverName));
3368    S stub = stubMaker.apply(channel);
3369    CompletableFuture<R> future = new CompletableFuture<>();
3370    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3371    callable.call(stub, controller, resp -> {
3372      if (controller.failed()) {
3373        future.completeExceptionally(controller.getFailed());
3374      } else {
3375        future.complete(resp);
3376      }
3377    });
3378    return future;
3379  }
3380
3381  @Override
3382  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3383    return this.<List<ServerName>> newMasterCaller()
3384      .action((controller, stub) -> this
3385        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3386          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3387          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3388          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3389      .call();
3390  }
3391
3392  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3393    return this.connection.callerFactory.<T> serverRequest()
3394      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3395      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3396      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3397      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3398  }
3399
3400  @Override
3401  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3402    if (tableName == null) {
3403      return failedFuture(new IllegalArgumentException("Table name is null"));
3404    }
3405    CompletableFuture<Void> future = new CompletableFuture<>();
3406    addListener(tableExists(tableName), (exist, err) -> {
3407      if (err != null) {
3408        future.completeExceptionally(err);
3409        return;
3410      }
3411      if (!exist) {
3412        future.completeExceptionally(new TableNotFoundException(
3413          "Table '" + tableName.getNameAsString() + "' does not exists."));
3414        return;
3415      }
3416      addListener(getTableSplits(tableName), (splits, err1) -> {
3417        if (err1 != null) {
3418          future.completeExceptionally(err1);
3419        } else {
3420          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3421            if (err2 != null) {
3422              future.completeExceptionally(err2);
3423            } else {
3424              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3425                if (err3 != null) {
3426                  future.completeExceptionally(err3);
3427                } else {
3428                  future.complete(result3);
3429                }
3430              });
3431            }
3432          });
3433        }
3434      });
3435    });
3436    return future;
3437  }
3438
3439  @Override
3440  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3441    if (tableName == null) {
3442      return failedFuture(new IllegalArgumentException("Table name is null"));
3443    }
3444    CompletableFuture<Void> future = new CompletableFuture<>();
3445    addListener(tableExists(tableName), (exist, err) -> {
3446      if (err != null) {
3447        future.completeExceptionally(err);
3448        return;
3449      }
3450      if (!exist) {
3451        future.completeExceptionally(new TableNotFoundException(
3452          "Table '" + tableName.getNameAsString() + "' does not exists."));
3453        return;
3454      }
3455      addListener(setTableReplication(tableName, false), (result, err2) -> {
3456        if (err2 != null) {
3457          future.completeExceptionally(err2);
3458        } else {
3459          future.complete(result);
3460        }
3461      });
3462    });
3463    return future;
3464  }
3465
3466  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3467    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3468    addListener(
3469      getRegions(tableName).thenApply(regions -> regions.stream()
3470        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3471      (regions, err2) -> {
3472        if (err2 != null) {
3473          future.completeExceptionally(err2);
3474          return;
3475        }
3476        if (regions.size() == 1) {
3477          future.complete(null);
3478        } else {
3479          byte[][] splits = new byte[regions.size() - 1][];
3480          for (int i = 1; i < regions.size(); i++) {
3481            splits[i - 1] = regions.get(i).getStartKey();
3482          }
3483          future.complete(splits);
3484        }
3485      });
3486    return future;
3487  }
3488
3489  /**
3490   * Connect to peer and check the table descriptor on peer:
3491   * <ol>
3492   * <li>Create the same table on peer when not exist.</li>
3493   * <li>Throw an exception if the table already has replication enabled on any of the column
3494   * families.</li>
3495   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3496   * </ol>
3497   * @param tableName name of the table to sync to the peer
3498   * @param splits table split keys
3499   */
3500  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3501      byte[][] splits) {
3502    CompletableFuture<Void> future = new CompletableFuture<>();
3503    addListener(listReplicationPeers(), (peers, err) -> {
3504      if (err != null) {
3505        future.completeExceptionally(err);
3506        return;
3507      }
3508      if (peers == null || peers.size() <= 0) {
3509        future.completeExceptionally(
3510          new IllegalArgumentException("Found no peer cluster for replication."));
3511        return;
3512      }
3513      List<CompletableFuture<Void>> futures = new ArrayList<>();
3514      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3515        .forEach(peer -> {
3516          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3517        });
3518      addListener(
3519        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3520        (result, err2) -> {
3521          if (err2 != null) {
3522            future.completeExceptionally(err2);
3523          } else {
3524            future.complete(result);
3525          }
3526        });
3527    });
3528    return future;
3529  }
3530
3531  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3532      ReplicationPeerDescription peer) {
3533    Configuration peerConf = null;
3534    try {
3535      peerConf =
3536        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3537    } catch (IOException e) {
3538      return failedFuture(e);
3539    }
3540    CompletableFuture<Void> future = new CompletableFuture<>();
3541    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3542      if (err != null) {
3543        future.completeExceptionally(err);
3544        return;
3545      }
3546      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3547        if (err1 != null) {
3548          future.completeExceptionally(err1);
3549          return;
3550        }
3551        AsyncAdmin peerAdmin = conn.getAdmin();
3552        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3553          if (err2 != null) {
3554            future.completeExceptionally(err2);
3555            return;
3556          }
3557          if (!exist) {
3558            CompletableFuture<Void> createTableFuture = null;
3559            if (splits == null) {
3560              createTableFuture = peerAdmin.createTable(tableDesc);
3561            } else {
3562              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3563            }
3564            addListener(createTableFuture, (result, err3) -> {
3565              if (err3 != null) {
3566                future.completeExceptionally(err3);
3567              } else {
3568                future.complete(result);
3569              }
3570            });
3571          } else {
3572            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3573              (result, err4) -> {
3574                if (err4 != null) {
3575                  future.completeExceptionally(err4);
3576                } else {
3577                  future.complete(result);
3578                }
3579              });
3580          }
3581        });
3582      });
3583    });
3584    return future;
3585  }
3586
3587  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3588      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3589    CompletableFuture<Void> future = new CompletableFuture<>();
3590    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3591      if (err != null) {
3592        future.completeExceptionally(err);
3593        return;
3594      }
3595      if (peerTableDesc == null) {
3596        future.completeExceptionally(
3597          new IllegalArgumentException("Failed to get table descriptor for table " +
3598            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3599        return;
3600      }
3601      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3602        future.completeExceptionally(new IllegalArgumentException(
3603          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3604            ", but the table descriptors are not same when compared with source cluster." +
3605            " Thus can not enable the table's replication switch."));
3606        return;
3607      }
3608      future.complete(null);
3609    });
3610    return future;
3611  }
3612
3613  /**
3614   * Set the table's replication switch if the table's replication switch is already not set.
3615   * @param tableName name of the table
3616   * @param enableRep is replication switch enable or disable
3617   */
3618  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3619    CompletableFuture<Void> future = new CompletableFuture<>();
3620    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3621      if (err != null) {
3622        future.completeExceptionally(err);
3623        return;
3624      }
3625      if (!tableDesc.matchReplicationScope(enableRep)) {
3626        int scope =
3627          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3628        TableDescriptor newTableDesc =
3629          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3630        addListener(modifyTable(newTableDesc), (result, err2) -> {
3631          if (err2 != null) {
3632            future.completeExceptionally(err2);
3633          } else {
3634            future.complete(result);
3635          }
3636        });
3637      } else {
3638        future.complete(null);
3639      }
3640    });
3641    return future;
3642  }
3643
3644  @Override
3645  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3646    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3647    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3648      if (err != null) {
3649        future.completeExceptionally(err);
3650        return;
3651      }
3652      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3653        locations.stream().filter(l -> l.getRegion() != null)
3654          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3655          .collect(Collectors.groupingBy(l -> l.getServerName(),
3656            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3657      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3658      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3659      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3660        futures
3661          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3662            if (err2 != null) {
3663              future.completeExceptionally(unwrapCompletionException(err2));
3664            } else {
3665              aggregator.append(stats);
3666            }
3667          }));
3668      }
3669      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3670        (ret, err3) -> {
3671          if (err3 != null) {
3672            future.completeExceptionally(unwrapCompletionException(err3));
3673          } else {
3674            future.complete(aggregator.sum());
3675          }
3676        });
3677    });
3678    return future;
3679  }
3680
3681  @Override
3682  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3683      boolean preserveSplits) {
3684    CompletableFuture<Void> future = new CompletableFuture<>();
3685    addListener(tableExists(tableName), (exist, err) -> {
3686      if (err != null) {
3687        future.completeExceptionally(err);
3688        return;
3689      }
3690      if (!exist) {
3691        future.completeExceptionally(new TableNotFoundException(tableName));
3692        return;
3693      }
3694      addListener(tableExists(newTableName), (exist1, err1) -> {
3695        if (err1 != null) {
3696          future.completeExceptionally(err1);
3697          return;
3698        }
3699        if (exist1) {
3700          future.completeExceptionally(new TableExistsException(newTableName));
3701          return;
3702        }
3703        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3704          if (err2 != null) {
3705            future.completeExceptionally(err2);
3706            return;
3707          }
3708          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3709          if (preserveSplits) {
3710            addListener(getTableSplits(tableName), (splits, err3) -> {
3711              if (err3 != null) {
3712                future.completeExceptionally(err3);
3713              } else {
3714                addListener(
3715                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3716                  (result, err4) -> {
3717                    if (err4 != null) {
3718                      future.completeExceptionally(err4);
3719                    } else {
3720                      future.complete(result);
3721                    }
3722                  });
3723              }
3724            });
3725          } else {
3726            addListener(createTable(newTableDesc), (result, err5) -> {
3727              if (err5 != null) {
3728                future.completeExceptionally(err5);
3729              } else {
3730                future.complete(result);
3731              }
3732            });
3733          }
3734        });
3735      });
3736    });
3737    return future;
3738  }
3739
3740  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3741      List<RegionInfo> hris) {
3742    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3743      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3744        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3745        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3746        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3747      .serverName(serverName).call();
3748  }
3749
3750  @Override
3751  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3752    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3753        .action((controller, stub) -> this
3754            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3755              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3756              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3757              resp -> resp.getPreviousRpcThrottleEnabled()))
3758        .call();
3759    return future;
3760  }
3761
3762  @Override
3763  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3764    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3765        .action((controller, stub) -> this
3766            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3767              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3768              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3769              resp -> resp.getRpcThrottleEnabled()))
3770        .call();
3771    return future;
3772  }
3773
3774  @Override
3775  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3776    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3777        .action((controller, stub) -> this
3778            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3779              controller, stub,
3780              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3781                  .build(),
3782              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3783              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3784        .call();
3785    return future;
3786  }
3787
3788  @Override
3789  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3790    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3791      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3792      Map<TableName, Long>> call(controller, stub,
3793        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3794        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3795        resp -> resp.getSizesList().stream().collect(Collectors
3796          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3797      .call();
3798  }
3799
3800  @Override
3801  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3802      ServerName serverName) {
3803    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3804      .action((controller, stub) -> this
3805        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3806        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3807          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3808          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3809          resp -> resp.getSnapshotsList().stream()
3810            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3811              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3812      .serverName(serverName).call();
3813  }
3814
3815  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3816      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3817    return this.<SpaceQuotaSnapshot> newMasterCaller()
3818      .action((controller, stub) -> this
3819        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3820          RequestConverter.buildGetQuotaStatesRequest(),
3821          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3822      .call();
3823  }
3824
3825  @Override
3826  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3827    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3828      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3829      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3830  }
3831
3832  @Override
3833  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3834    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3835    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3836      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3837      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3838  }
3839
3840  @Override
3841  public CompletableFuture<Void> grant(UserPermission userPermission,
3842      boolean mergeExistingPermissions) {
3843    return this.<Void> newMasterCaller()
3844        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3845          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3846          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3847        .call();
3848  }
3849
3850  @Override
3851  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3852    return this.<Void> newMasterCaller()
3853        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3854          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3855          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3856        .call();
3857  }
3858
3859  @Override
3860  public CompletableFuture<List<UserPermission>>
3861      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3862    return this.<List<UserPermission>> newMasterCaller().action((controller,
3863        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3864            List<UserPermission>> call(controller, stub,
3865              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3866              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3867              resp -> resp.getUserPermissionList().stream()
3868                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3869                .collect(Collectors.toList())))
3870        .call();
3871  }
3872
3873  @Override
3874  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3875      List<Permission> permissions) {
3876    return this.<List<Boolean>> newMasterCaller()
3877        .action((controller, stub) -> this
3878            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3879              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3880              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3881              resp -> resp.getHasUserPermissionList()))
3882        .call();
3883  }
3884
3885  @Override
3886  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3887      final boolean sync) {
3888    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3889        .call(controller, stub, RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3890            MasterService.Interface::switchSnapshotCleanup,
3891            SetSnapshotCleanupResponse::getPrevSnapshotCleanup)).call();
3892  }
3893
3894  @Override
3895  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3896    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3897        .call(controller, stub, RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3898            MasterService.Interface::isSnapshotCleanupEnabled,
3899            IsSnapshotCleanupEnabledResponse::getEnabled)).call();
3900  }
3901
3902  @Override
3903  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
3904    return this.<Void> newMasterCaller()
3905        .action((controller, stub) -> this.
3906            <MoveServersRequest, MoveServersResponse, Void> call(controller, stub,
3907                RequestConverter.buildMoveServersRequest(servers, groupName),
3908              (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
3909        .call();
3910  }
3911
3912  @Override
3913  public CompletableFuture<Void> addRSGroup(String groupName) {
3914    return this.<Void> newMasterCaller()
3915        .action(((controller, stub) -> this.
3916            <AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, stub,
3917                AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3918              (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
3919        .call();
3920  }
3921
3922  @Override
3923  public CompletableFuture<Void> removeRSGroup(String groupName) {
3924    return this.<Void> newMasterCaller()
3925        .action((controller, stub) -> this.
3926            <RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(controller, stub,
3927                RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3928              (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
3929        .call();
3930  }
3931
3932  @Override
3933  public CompletableFuture<Boolean> balanceRSGroup(String groupName) {
3934    return this.<Boolean> newMasterCaller()
3935        .action((controller, stub) -> this.
3936            <BalanceRSGroupRequest, BalanceRSGroupResponse, Boolean> call(controller, stub,
3937                BalanceRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3938              (s, c, req, done) -> s.balanceRSGroup(c, req, done), resp -> resp.getBalanceRan()))
3939        .call();
3940  }
3941
3942  @Override
3943  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
3944    return this.<List<RSGroupInfo>> newMasterCaller()
3945        .action((controller, stub) -> this
3946            .<ListRSGroupInfosRequest, ListRSGroupInfosResponse, List<RSGroupInfo>> call(
3947                controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
3948              (s, c, req, done) -> s.listRSGroupInfos(c, req, done),
3949              resp -> resp.getRSGroupInfoList().stream()
3950                  .map(r -> ProtobufUtil.toGroupInfo(r))
3951                  .collect(Collectors.toList())))
3952        .call();
3953  }
3954
3955  @Override
3956  public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
3957      @Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
3958    if (CollectionUtils.isEmpty(serverNames)) {
3959      return CompletableFuture.completedFuture(Collections.emptyList());
3960    }
3961    if (logQueryFilter.getType() == null
3962        || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
3963      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3964        .map((ServerName serverName) ->
3965          getSlowLogResponseFromServer(serverName, logQueryFilter))
3966        .map(CompletableFuture::join)
3967        .flatMap(List::stream)
3968        .collect(Collectors.toList()));
3969    } else {
3970      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3971        .map((ServerName serverName) ->
3972          getLargeLogResponseFromServer(serverName, logQueryFilter))
3973        .map(CompletableFuture::join)
3974        .flatMap(List::stream)
3975        .collect(Collectors.toList()));
3976    }
3977  }
3978
3979  private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer(
3980      final ServerName serverName, final LogQueryFilter logQueryFilter) {
3981    return this.<List<OnlineLogRecord>>newAdminCaller()
3982      .action((controller, stub) -> this
3983        .adminCall(
3984          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
3985          AdminService.Interface::getSlowLogResponses,
3986          ProtobufUtil::toSlowLogPayloads))
3987      .serverName(serverName).call();
3988  }
3989
3990  private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer(
3991    final ServerName serverName, final LogQueryFilter logQueryFilter) {
3992    return this.<List<OnlineLogRecord>>newAdminCaller()
3993      .action((controller, stub) -> this
3994        .adminCall(
3995          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
3996          AdminService.Interface::getLargeLogResponses,
3997          ProtobufUtil::toSlowLogPayloads))
3998      .serverName(serverName).call();
3999  }
4000
4001  @Override
4002  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
4003      @Nullable Set<ServerName> serverNames) {
4004    if (CollectionUtils.isEmpty(serverNames)) {
4005      return CompletableFuture.completedFuture(Collections.emptyList());
4006    }
4007    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
4008      .map(this::clearSlowLogsResponses)
4009      .collect(Collectors.toList());
4010    return convertToFutureOfList(clearSlowLogResponseList);
4011  }
4012
4013  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4014    return this.<Boolean>newAdminCaller()
4015      .action(((controller, stub) -> this
4016        .adminCall(
4017          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
4018          AdminService.Interface::clearSlowLogsResponses,
4019          ProtobufUtil::toClearSlowLogPayload))
4020      ).serverName(serverName).call();
4021  }
4022
4023  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
4024    List<CompletableFuture<T>> futures) {
4025    CompletableFuture<Void> allDoneFuture =
4026      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4027    return allDoneFuture.thenApply(v ->
4028      futures.stream()
4029        .map(CompletableFuture::join)
4030        .collect(Collectors.toList())
4031    );
4032  }
4033
4034  @Override
4035  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4036    return this.<List<TableName>> newMasterCaller()
4037      .action((controller, stub) -> this
4038        .<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, List<TableName>> call(controller,
4039          stub, ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4040          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4041            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4042      .call();
4043  }
4044
4045  @Override
4046  public CompletableFuture<Pair<List<String>, List<TableName>>>
4047    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4048    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4049      .action((controller, stub) -> this
4050        .<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4051          GetConfiguredNamespacesAndTablesInRSGroupResponse,
4052          Pair<List<String>, List<TableName>>> call(controller, stub,
4053          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4054            .build(),
4055            (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4056            resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4057              .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4058      .call();
4059  }
4060
4061  @Override
4062  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4063    return this.<RSGroupInfo> newMasterCaller()
4064      .action(((controller, stub) -> this
4065        .<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, RSGroupInfo> call(
4066          controller, stub,
4067          GetRSGroupInfoOfServerRequest.newBuilder()
4068            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4069              .setPort(hostPort.getPort()).build())
4070            .build(),
4071          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4072          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4073      .call();
4074  }
4075
4076  @Override
4077  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4078    return this.<Void> newMasterCaller()
4079      .action((controller, stub) -> this.
4080            <RemoveServersRequest, RemoveServersResponse, Void> call(controller, stub,
4081                RequestConverter.buildRemoveServersRequest(servers),
4082              (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4083        .call();
4084  }
4085
4086  @Override
4087  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4088    CompletableFuture<Void> future = new CompletableFuture<>();
4089    for (TableName tableName : tables) {
4090      addListener(tableExists(tableName), (exist, err) -> {
4091        if (err != null) {
4092          future.completeExceptionally(err);
4093          return;
4094        }
4095        if (!exist) {
4096          future.completeExceptionally(new TableNotFoundException(tableName));
4097          return;
4098        }
4099      });
4100    }
4101    addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4102      if (err != null) {
4103        future.completeExceptionally(err);
4104        return;
4105      }
4106      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4107        future.complete(null);
4108        return;
4109      }
4110      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4111      for (TableDescriptor td : tableDescriptions) {
4112        newTableDescriptors
4113            .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4114      }
4115      addListener(CompletableFuture.allOf(
4116        newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4117        (v, e) -> {
4118          if (e != null) {
4119            future.completeExceptionally(e);
4120          } else {
4121            future.complete(v);
4122          }
4123        });
4124    }));
4125    return future;
4126  }
4127
4128  @Override
4129  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4130    return this.<RSGroupInfo> newMasterCaller().action(((controller, stub) -> this
4131      .<GetRSGroupInfoOfTableRequest, GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller,
4132        stub,
4133        GetRSGroupInfoOfTableRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(table))
4134          .build(),
4135        (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4136        resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4137      .call();
4138  }
4139
4140  @Override
4141  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4142    return this.<RSGroupInfo> newMasterCaller()
4143      .action(((controller, stub) -> this
4144        .<GetRSGroupInfoRequest, GetRSGroupInfoResponse, RSGroupInfo> call(controller, stub,
4145          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4146          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4147          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4148      .call();
4149  }
4150
4151  @Override
4152  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4153    return this.<Void> newMasterCaller()
4154      .action(
4155        (
4156          (controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4157            controller,
4158            stub,
4159            RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName).setNewRsgroupName(newName)
4160                                .build(),
4161            (s, c, req, done) -> s.renameRSGroup(c, req, done),
4162            resp -> null
4163          )
4164        )
4165      ).call();
4166  }
4167}