001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.quotas.QuotaFilter;
080import org.apache.hadoop.hbase.quotas.QuotaSettings;
081import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
083import org.apache.hadoop.hbase.replication.ReplicationException;
084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
086import org.apache.hadoop.hbase.replication.SyncReplicationState;
087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.protobuf.Message;
106import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
107import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
108import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
109import org.apache.hbase.thirdparty.io.netty.util.Timeout;
110import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
111import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
112
113import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
114import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
333
334/**
335 * The implementation of AsyncAdmin.
336 * <p>
337 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
338 * be finished inside the rpc framework thread, which means that the callbacks registered to the
339 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
340 * this class should not try to do time consuming tasks in the callbacks.
341 * @since 2.0.0
342 * @see AsyncHBaseAdmin
343 * @see AsyncConnection#getAdmin()
344 * @see AsyncConnection#getAdminBuilder()
345 */
346@InterfaceAudience.Private
347class RawAsyncHBaseAdmin implements AsyncAdmin {
348
349  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
350
351  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
352
353  private final AsyncConnectionImpl connection;
354
355  private final HashedWheelTimer retryTimer;
356
357  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
358
359  private final long rpcTimeoutNs;
360
361  private final long operationTimeoutNs;
362
363  private final long pauseNs;
364
365  private final long pauseForCQTBENs;
366
367  private final int maxAttempts;
368
369  private final int startLogErrorsCnt;
370
371  private final NonceGenerator ng;
372
373  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
374      AsyncAdminBuilderBase builder) {
375    this.connection = connection;
376    this.retryTimer = retryTimer;
377    this.metaTable = connection.getTable(META_TABLE_NAME);
378    this.rpcTimeoutNs = builder.rpcTimeoutNs;
379    this.operationTimeoutNs = builder.operationTimeoutNs;
380    this.pauseNs = builder.pauseNs;
381    if (builder.pauseForCQTBENs < builder.pauseNs) {
382      LOG.warn(
383        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
384          " the normal pause value {} ms, use the greater one instead",
385        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
386        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
387      this.pauseForCQTBENs = builder.pauseNs;
388    } else {
389      this.pauseForCQTBENs = builder.pauseForCQTBENs;
390    }
391    this.maxAttempts = builder.maxAttempts;
392    this.startLogErrorsCnt = builder.startLogErrorsCnt;
393    this.ng = connection.getNonceGenerator();
394  }
395
396  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
397    return this.connection.callerFactory.<T> masterRequest()
398      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
399      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
400      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
401      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
402  }
403
404  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
405    return this.connection.callerFactory.<T> adminRequest()
406      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
407      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
408      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
409      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
410  }
411
412  @FunctionalInterface
413  private interface MasterRpcCall<RESP, REQ> {
414    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
415        RpcCallback<RESP> done);
416  }
417
418  @FunctionalInterface
419  private interface AdminRpcCall<RESP, REQ> {
420    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
421        RpcCallback<RESP> done);
422  }
423
424  @FunctionalInterface
425  private interface Converter<D, S> {
426    D convert(S src) throws IOException;
427  }
428
429  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
430      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
431      Converter<RESP, PRESP> respConverter) {
432    CompletableFuture<RESP> future = new CompletableFuture<>();
433    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
434
435      @Override
436      public void run(PRESP resp) {
437        if (controller.failed()) {
438          future.completeExceptionally(controller.getFailed());
439        } else {
440          try {
441            future.complete(respConverter.convert(resp));
442          } catch (IOException e) {
443            future.completeExceptionally(e);
444          }
445        }
446      }
447    });
448    return future;
449  }
450
451  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
452      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
453      Converter<RESP, PRESP> respConverter) {
454    CompletableFuture<RESP> future = new CompletableFuture<>();
455    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
456
457      @Override
458      public void run(PRESP resp) {
459        if (controller.failed()) {
460          future.completeExceptionally(controller.getFailed());
461        } else {
462          try {
463            future.complete(respConverter.convert(resp));
464          } catch (IOException e) {
465            future.completeExceptionally(e);
466          }
467        }
468      }
469    });
470    return future;
471  }
472
473  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
474      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
475      ProcedureBiConsumer consumer) {
476    return procedureCall(b -> {
477    }, preq, rpcCall, respConverter, consumer);
478  }
479
480  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
481      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
482      ProcedureBiConsumer consumer) {
483    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
484  }
485
486  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
487      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
488      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
489      ProcedureBiConsumer consumer) {
490    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
491        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
492    prioritySetter.accept(builder);
493    CompletableFuture<Long> procFuture = builder.call();
494    CompletableFuture<Void> future = waitProcedureResult(procFuture);
495    addListener(future, consumer);
496    return future;
497  }
498
499  @FunctionalInterface
500  private interface TableOperator {
501    CompletableFuture<Void> operate(TableName table);
502  }
503
504  @Override
505  public CompletableFuture<Boolean> tableExists(TableName tableName) {
506    if (TableName.isMetaTableName(tableName)) {
507      return CompletableFuture.completedFuture(true);
508    }
509    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
510  }
511
512  @Override
513  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
514    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
515      includeSysTables));
516  }
517
518  /**
519   * {@link #listTableDescriptors(boolean)}
520   */
521  @Override
522  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
523      boolean includeSysTables) {
524    Preconditions.checkNotNull(pattern,
525      "pattern is null. If you don't specify a pattern, "
526          + "use listTableDescriptors(boolean) instead");
527    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
528      includeSysTables));
529  }
530
531  @Override
532  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
533    Preconditions.checkNotNull(tableNames,
534      "tableNames is null. If you don't specify tableNames, "
535          + "use listTableDescriptors(boolean) instead");
536    if (tableNames.isEmpty()) {
537      return CompletableFuture.completedFuture(Collections.emptyList());
538    }
539    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
540  }
541
542  private CompletableFuture<List<TableDescriptor>>
543      getTableDescriptors(GetTableDescriptorsRequest request) {
544    return this.<List<TableDescriptor>> newMasterCaller()
545        .action((controller, stub) -> this
546            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
547              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
548              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
549        .call();
550  }
551
552  @Override
553  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
554    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
555  }
556
557  @Override
558  public CompletableFuture<List<TableName>>
559      listTableNames(Pattern pattern, boolean includeSysTables) {
560    Preconditions.checkNotNull(pattern,
561        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
562    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
563  }
564
565  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
566    return this
567        .<List<TableName>> newMasterCaller()
568        .action(
569          (controller, stub) -> this
570              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
571                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
572                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
573  }
574
575  @Override
576  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
577    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
578        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
579        List<TableDescriptor>> call(
580          controller, stub,
581          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
582          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
583          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
584        .call();
585  }
586
587  @Override
588  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
589    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
590        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
591        List<TableName>> call(
592          controller, stub,
593          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
594          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
595          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
596        .call();
597  }
598
599  @Override
600  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
601    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
602    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
603      .action((controller, stub) -> this
604        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
605          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
606          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
607          (resp) -> resp.getTableSchemaList()))
608      .call(), (tableSchemas, error) -> {
609        if (error != null) {
610          future.completeExceptionally(error);
611          return;
612        }
613        if (!tableSchemas.isEmpty()) {
614          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
615        } else {
616          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
617        }
618      });
619    return future;
620  }
621
622  @Override
623  public CompletableFuture<Void> createTable(TableDescriptor desc) {
624    return createTable(desc.getTableName(),
625      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
626  }
627
628  @Override
629  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
630      int numRegions) {
631    try {
632      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
633    } catch (IllegalArgumentException e) {
634      return failedFuture(e);
635    }
636  }
637
638  @Override
639  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
640    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
641        + " use createTable(TableDescriptor) instead");
642    try {
643      verifySplitKeys(splitKeys);
644      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
645        splitKeys, ng.getNonceGroup(), ng.newNonce()));
646    } catch (IllegalArgumentException e) {
647      return failedFuture(e);
648    }
649  }
650
651  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
652    Preconditions.checkNotNull(tableName, "table name is null");
653    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
654      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
655      new CreateTableProcedureBiConsumer(tableName));
656  }
657
658  @Override
659  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
660    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
661      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
662        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
663      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
664  }
665
666  @Override
667  public CompletableFuture<Void> deleteTable(TableName tableName) {
668    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
669      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
670      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
671      new DeleteTableProcedureBiConsumer(tableName));
672  }
673
674  @Override
675  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
676    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
677      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
678        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
679      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
680  }
681
682  @Override
683  public CompletableFuture<Void> enableTable(TableName tableName) {
684    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
685      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
686      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
687      new EnableTableProcedureBiConsumer(tableName));
688  }
689
690  @Override
691  public CompletableFuture<Void> disableTable(TableName tableName) {
692    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
693      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
694      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
695      new DisableTableProcedureBiConsumer(tableName));
696  }
697
698  /**
699   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
700   * using passed parameters. Sets error or boolean result ('true' if table matches
701   * the passed-in targetState).
702   */
703  private static CompletableFuture<Boolean> completeCheckTableState(
704      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
705      TableState.State targetState, TableName tableName) {
706    if (error != null) {
707      future.completeExceptionally(error);
708    } else {
709      if (tableState != null) {
710        future.complete(tableState.inStates(targetState));
711      } else {
712        future.completeExceptionally(new TableNotFoundException(tableName));
713      }
714    }
715    return future;
716  }
717
718  @Override
719  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
720    if (TableName.isMetaTableName(tableName)) {
721      return CompletableFuture.completedFuture(true);
722    }
723    CompletableFuture<Boolean> future = new CompletableFuture<>();
724    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
725      (tableState, error) -> {
726        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
727          TableState.State.ENABLED, tableName);
728      });
729    return future;
730  }
731
732  @Override
733  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
734    if (TableName.isMetaTableName(tableName)) {
735      return CompletableFuture.completedFuture(false);
736    }
737    CompletableFuture<Boolean> future = new CompletableFuture<>();
738    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
739      (tableState, error) -> {
740        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
741          TableState.State.DISABLED, tableName);
742      });
743    return future;
744  }
745
746  @Override
747  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
748    if (TableName.isMetaTableName(tableName)) {
749      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
750        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
751    }
752    CompletableFuture<Boolean> future = new CompletableFuture<>();
753    addListener(isTableEnabled(tableName), (enabled, error) -> {
754      if (error != null) {
755        if (error instanceof TableNotFoundException) {
756          future.complete(false);
757        } else {
758          future.completeExceptionally(error);
759        }
760        return;
761      }
762      if (!enabled) {
763        future.complete(false);
764      } else {
765        addListener(
766          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
767          (locations, error1) -> {
768            if (error1 != null) {
769              future.completeExceptionally(error1);
770              return;
771            }
772            List<HRegionLocation> notDeployedRegions = locations.stream()
773              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
774            if (notDeployedRegions.size() > 0) {
775              if (LOG.isDebugEnabled()) {
776                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
777              }
778              future.complete(false);
779              return;
780            }
781            future.complete(true);
782          });
783      }
784    });
785    return future;
786  }
787
788  @Override
789  public CompletableFuture<Void> addColumnFamily(
790      TableName tableName, ColumnFamilyDescriptor columnFamily) {
791    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
792      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
793        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
794      new AddColumnFamilyProcedureBiConsumer(tableName));
795  }
796
797  @Override
798  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
799    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
800      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
801        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
802      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
803  }
804
805  @Override
806  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
807      ColumnFamilyDescriptor columnFamily) {
808    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
809      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
810        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
811      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
812  }
813
814  @Override
815  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
816    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
817      RequestConverter.buildCreateNamespaceRequest(descriptor),
818      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
819      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
820  }
821
822  @Override
823  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
824    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
825      RequestConverter.buildModifyNamespaceRequest(descriptor),
826      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
827      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
828  }
829
830  @Override
831  public CompletableFuture<Void> deleteNamespace(String name) {
832    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
833      RequestConverter.buildDeleteNamespaceRequest(name),
834      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
835      new DeleteNamespaceProcedureBiConsumer(name));
836  }
837
838  @Override
839  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
840    return this
841        .<NamespaceDescriptor> newMasterCaller()
842        .action(
843          (controller, stub) -> this
844              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
845                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
846                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
847                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
848  }
849
850  @Override
851  public CompletableFuture<List<String>> listNamespaces() {
852    return this
853        .<List<String>> newMasterCaller()
854        .action(
855          (controller, stub) -> this
856              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
857                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
858                  done) -> s.listNamespaces(c, req, done),
859                (resp) -> resp.getNamespaceNameList())).call();
860  }
861
862  @Override
863  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
864    return this
865        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
866              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
867                  List<NamespaceDescriptor>> call(controller, stub,
868                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
869                      s.listNamespaceDescriptors(c, req, done),
870                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
871  }
872
873  @Override
874  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
875    return this.<List<RegionInfo>> newAdminCaller()
876        .action((controller, stub) -> this
877            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
878              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
879              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
880              resp -> ProtobufUtil.getRegionInfos(resp)))
881        .serverName(serverName).call();
882  }
883
884  @Override
885  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
886    if (tableName.equals(META_TABLE_NAME)) {
887      return connection.registry.getMetaRegionLocations()
888        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
889          .collect(Collectors.toList()));
890    } else {
891      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
892        .thenApply(
893          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
894    }
895  }
896  @Override
897  public CompletableFuture<Void> flush(TableName tableName) {
898    return flush(tableName, null);
899  }
900
901  @Override
902  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
903    CompletableFuture<Void> future = new CompletableFuture<>();
904    addListener(tableExists(tableName), (exists, err) -> {
905      if (err != null) {
906        future.completeExceptionally(err);
907      } else if (!exists) {
908        future.completeExceptionally(new TableNotFoundException(tableName));
909      } else {
910        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
911          if (err2 != null) {
912            future.completeExceptionally(err2);
913          } else if (!tableEnabled) {
914            future.completeExceptionally(new TableNotEnabledException(tableName));
915          } else {
916            Map<String, String> props = new HashMap<>();
917            if (columnFamily != null) {
918              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
919            }
920            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
921              props), (ret, err3) -> {
922                if (err3 != null) {
923                  future.completeExceptionally(err3);
924                } else {
925                  future.complete(ret);
926                }
927              });
928          }
929        });
930      }
931    });
932    return future;
933  }
934
935  @Override
936  public CompletableFuture<Void> flushRegion(byte[] regionName) {
937    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
938    });
939  }
940
941  @Override
942  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
943    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
944      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
945    return flushRegionInternal(regionName, columnFamily, false)
946      .thenAccept(r -> {});
947  }
948
949  /**
950   * This method is for internal use only, where we need the response of the flush.
951   * <p/>
952   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
953   * API.
954   */
955  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
956    byte[] columnFamily, boolean writeFlushWALMarker) {
957    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
958    addListener(getRegionLocation(regionName), (location, err) -> {
959      if (err != null) {
960        future.completeExceptionally(err);
961        return;
962      }
963      ServerName serverName = location.getServerName();
964      if (serverName == null) {
965        future
966          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
967        return;
968      }
969      addListener(
970        flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
971        (ret, err2) -> {
972          if (err2 != null) {
973            future.completeExceptionally(err2);
974          } else {
975            future.complete(ret);
976          }});
977    });
978    return future;
979  }
980
981  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
982    byte[] columnFamily, boolean writeFlushWALMarker) {
983    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
984      .action((controller, stub) -> this
985        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
986          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(),
987            columnFamily, writeFlushWALMarker),
988          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
989      .call();
990  }
991
992  @Override
993  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
994    CompletableFuture<Void> future = new CompletableFuture<>();
995    addListener(getRegions(sn), (hRegionInfos, err) -> {
996      if (err != null) {
997        future.completeExceptionally(err);
998        return;
999      }
1000      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1001      if (hRegionInfos != null) {
1002        hRegionInfos.forEach(
1003          region -> compactFutures.add(
1004            flush(sn, region, null, false).thenAccept(r -> {})
1005          )
1006        );
1007      }
1008      addListener(CompletableFuture.allOf(
1009        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1010          if (err2 != null) {
1011            future.completeExceptionally(err2);
1012          } else {
1013            future.complete(ret);
1014          }
1015        });
1016    });
1017    return future;
1018  }
1019
1020  @Override
1021  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1022    return compact(tableName, null, false, compactType);
1023  }
1024
1025  @Override
1026  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1027      CompactType compactType) {
1028    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1029        + "If you don't specify a columnFamily, use compact(TableName) instead");
1030    return compact(tableName, columnFamily, false, compactType);
1031  }
1032
1033  @Override
1034  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1035    return compactRegion(regionName, null, false);
1036  }
1037
1038  @Override
1039  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1040    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1041        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1042    return compactRegion(regionName, columnFamily, false);
1043  }
1044
1045  @Override
1046  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1047    return compact(tableName, null, true, compactType);
1048  }
1049
1050  @Override
1051  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1052      CompactType compactType) {
1053    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1054        + "If you don't specify a columnFamily, use compact(TableName) instead");
1055    return compact(tableName, columnFamily, true, compactType);
1056  }
1057
1058  @Override
1059  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1060    return compactRegion(regionName, null, true);
1061  }
1062
1063  @Override
1064  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1065    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1066        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1067    return compactRegion(regionName, columnFamily, true);
1068  }
1069
1070  @Override
1071  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1072    return compactRegionServer(sn, false);
1073  }
1074
1075  @Override
1076  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1077    return compactRegionServer(sn, true);
1078  }
1079
1080  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1081    CompletableFuture<Void> future = new CompletableFuture<>();
1082    addListener(getRegions(sn), (hRegionInfos, err) -> {
1083      if (err != null) {
1084        future.completeExceptionally(err);
1085        return;
1086      }
1087      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1088      if (hRegionInfos != null) {
1089        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1090      }
1091      addListener(CompletableFuture.allOf(
1092        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1093          if (err2 != null) {
1094            future.completeExceptionally(err2);
1095          } else {
1096            future.complete(ret);
1097          }
1098        });
1099    });
1100    return future;
1101  }
1102
1103  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1104      boolean major) {
1105    CompletableFuture<Void> future = new CompletableFuture<>();
1106    addListener(getRegionLocation(regionName), (location, err) -> {
1107      if (err != null) {
1108        future.completeExceptionally(err);
1109        return;
1110      }
1111      ServerName serverName = location.getServerName();
1112      if (serverName == null) {
1113        future
1114          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1115        return;
1116      }
1117      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1118        (ret, err2) -> {
1119          if (err2 != null) {
1120            future.completeExceptionally(err2);
1121          } else {
1122            future.complete(ret);
1123          }
1124        });
1125    });
1126    return future;
1127  }
1128
1129  /**
1130   * List all region locations for the specific table.
1131   */
1132  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1133    if (TableName.META_TABLE_NAME.equals(tableName)) {
1134      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1135      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1136        if (err != null) {
1137          future.completeExceptionally(err);
1138        } else if (metaRegions == null || metaRegions.isEmpty() ||
1139          metaRegions.getDefaultRegionLocation() == null) {
1140          future.completeExceptionally(new IOException("meta region does not found"));
1141        } else {
1142          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1143        }
1144      });
1145      return future;
1146    } else {
1147      // For non-meta table, we fetch all locations by scanning hbase:meta table
1148      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1149    }
1150  }
1151
1152  /**
1153   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1154   */
1155  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1156      CompactType compactType) {
1157    CompletableFuture<Void> future = new CompletableFuture<>();
1158
1159    switch (compactType) {
1160      case MOB:
1161        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1162          if (err != null) {
1163            future.completeExceptionally(err);
1164            return;
1165          }
1166          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1167          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1168            if (err2 != null) {
1169              future.completeExceptionally(err2);
1170            } else {
1171              future.complete(ret);
1172            }
1173          });
1174        });
1175        break;
1176      case NORMAL:
1177        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1178          if (err != null) {
1179            future.completeExceptionally(err);
1180            return;
1181          }
1182          if (locations == null || locations.isEmpty()) {
1183            future.completeExceptionally(new TableNotFoundException(tableName));
1184          }
1185          CompletableFuture<?>[] compactFutures =
1186            locations.stream().filter(l -> l.getRegion() != null)
1187              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1188              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1189              .toArray(CompletableFuture<?>[]::new);
1190          // future complete unless all of the compact futures are completed.
1191          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1192            if (err2 != null) {
1193              future.completeExceptionally(err2);
1194            } else {
1195              future.complete(ret);
1196            }
1197          });
1198        });
1199        break;
1200      default:
1201        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1202    }
1203    return future;
1204  }
1205
1206  /**
1207   * Compact the region at specific region server.
1208   */
1209  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1210      final boolean major, byte[] columnFamily) {
1211    return this
1212        .<Void> newAdminCaller()
1213        .serverName(sn)
1214        .action(
1215          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1216            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1217              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1218            resp -> null)).call();
1219  }
1220
1221  private byte[] toEncodeRegionName(byte[] regionName) {
1222    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1223      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1224  }
1225
1226  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1227      CompletableFuture<TableName> result) {
1228    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1229      if (err != null) {
1230        result.completeExceptionally(err);
1231        return;
1232      }
1233      RegionInfo regionInfo = location.getRegion();
1234      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1235        result.completeExceptionally(
1236          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1237        return;
1238      }
1239      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1240        if (!tableName.get().equals(regionInfo.getTable())) {
1241          // tables of this two region should be same.
1242          result.completeExceptionally(
1243            new IllegalArgumentException("Cannot merge regions from two different tables " +
1244              tableName.get() + " and " + regionInfo.getTable()));
1245        } else {
1246          result.complete(tableName.get());
1247        }
1248      }
1249    });
1250  }
1251
1252  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1253    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1254    CompletableFuture<TableName> future = new CompletableFuture<>();
1255    for (byte[] encodedRegionName : encodedRegionNames) {
1256      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1257    }
1258    return future;
1259  }
1260
1261  @Override
1262  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1263    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1264  }
1265
1266  @Override
1267  public CompletableFuture<Boolean> isMergeEnabled() {
1268    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1269  }
1270
1271  @Override
1272  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1273    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1274  }
1275
1276  @Override
1277  public CompletableFuture<Boolean> isSplitEnabled() {
1278    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1279  }
1280
1281  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1282      MasterSwitchType switchType) {
1283    SetSplitOrMergeEnabledRequest request =
1284      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1285    return this.<Boolean> newMasterCaller()
1286      .action((controller, stub) -> this
1287        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1288          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1289          (resp) -> resp.getPrevValueList().get(0)))
1290      .call();
1291  }
1292
1293  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1294    IsSplitOrMergeEnabledRequest request =
1295        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1296    return this
1297        .<Boolean> newMasterCaller()
1298        .action(
1299          (controller, stub) -> this
1300              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1301                controller, stub, request,
1302                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1303                (resp) -> resp.getEnabled())).call();
1304  }
1305
1306  @Override
1307  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1308    if (nameOfRegionsToMerge.size() < 2) {
1309      return failedFuture(new IllegalArgumentException(
1310        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1311    }
1312    CompletableFuture<Void> future = new CompletableFuture<>();
1313    byte[][] encodedNameOfRegionsToMerge =
1314      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1315
1316    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1317      if (err != null) {
1318        future.completeExceptionally(err);
1319        return;
1320      }
1321
1322      final MergeTableRegionsRequest request;
1323      try {
1324        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1325          forcible, ng.getNonceGroup(), ng.newNonce());
1326      } catch (DeserializationException e) {
1327        future.completeExceptionally(e);
1328        return;
1329      }
1330
1331      addListener(
1332        this.procedureCall(tableName, request,
1333          MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1334          new MergeTableRegionProcedureBiConsumer(tableName)),
1335        (ret, err2) -> {
1336          if (err2 != null) {
1337            future.completeExceptionally(err2);
1338          } else {
1339            future.complete(ret);
1340          }
1341        });
1342    });
1343    return future;
1344  }
1345
1346  @Override
1347  public CompletableFuture<Void> split(TableName tableName) {
1348    CompletableFuture<Void> future = new CompletableFuture<>();
1349    addListener(tableExists(tableName), (exist, error) -> {
1350      if (error != null) {
1351        future.completeExceptionally(error);
1352        return;
1353      }
1354      if (!exist) {
1355        future.completeExceptionally(new TableNotFoundException(tableName));
1356        return;
1357      }
1358      addListener(metaTable
1359        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1360          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1361            ClientMetaTableAccessor.QueryType.REGION))
1362          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1363            ClientMetaTableAccessor.QueryType.REGION))),
1364        (results, err2) -> {
1365          if (err2 != null) {
1366            future.completeExceptionally(err2);
1367            return;
1368          }
1369          if (results != null && !results.isEmpty()) {
1370            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1371            for (Result r : results) {
1372              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1373                continue;
1374              }
1375              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1376              if (rl != null) {
1377                for (HRegionLocation h : rl.getRegionLocations()) {
1378                  if (h != null && h.getServerName() != null) {
1379                    RegionInfo hri = h.getRegion();
1380                    if (hri == null || hri.isSplitParent() ||
1381                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1382                      continue;
1383                    }
1384                    splitFutures.add(split(hri, null));
1385                  }
1386                }
1387              }
1388            }
1389            addListener(
1390              CompletableFuture
1391                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1392              (ret, exception) -> {
1393                if (exception != null) {
1394                  future.completeExceptionally(exception);
1395                  return;
1396                }
1397                future.complete(ret);
1398              });
1399          } else {
1400            future.complete(null);
1401          }
1402        });
1403    });
1404    return future;
1405  }
1406
1407  @Override
1408  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1409    CompletableFuture<Void> result = new CompletableFuture<>();
1410    if (splitPoint == null) {
1411      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1412    }
1413    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1414      (loc, err) -> {
1415        if (err != null) {
1416          result.completeExceptionally(err);
1417        } else if (loc == null || loc.getRegion() == null) {
1418          result.completeExceptionally(new IllegalArgumentException(
1419            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1420        } else {
1421          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1422            if (err2 != null) {
1423              result.completeExceptionally(err2);
1424            } else {
1425              result.complete(ret);
1426            }
1427
1428          });
1429        }
1430      });
1431    return result;
1432  }
1433
1434  @Override
1435  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1436    CompletableFuture<Void> future = new CompletableFuture<>();
1437    addListener(getRegionLocation(regionName), (location, err) -> {
1438      if (err != null) {
1439        future.completeExceptionally(err);
1440        return;
1441      }
1442      RegionInfo regionInfo = location.getRegion();
1443      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1444        future
1445          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1446            "Replicas are auto-split when their primary is split."));
1447        return;
1448      }
1449      ServerName serverName = location.getServerName();
1450      if (serverName == null) {
1451        future
1452          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1453        return;
1454      }
1455      addListener(split(regionInfo, null), (ret, err2) -> {
1456        if (err2 != null) {
1457          future.completeExceptionally(err2);
1458        } else {
1459          future.complete(ret);
1460        }
1461      });
1462    });
1463    return future;
1464  }
1465
1466  @Override
1467  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1468    Preconditions.checkNotNull(splitPoint,
1469      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1470    CompletableFuture<Void> future = new CompletableFuture<>();
1471    addListener(getRegionLocation(regionName), (location, err) -> {
1472      if (err != null) {
1473        future.completeExceptionally(err);
1474        return;
1475      }
1476      RegionInfo regionInfo = location.getRegion();
1477      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1478        future
1479          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1480            "Replicas are auto-split when their primary is split."));
1481        return;
1482      }
1483      ServerName serverName = location.getServerName();
1484      if (serverName == null) {
1485        future
1486          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1487        return;
1488      }
1489      if (regionInfo.getStartKey() != null &&
1490        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1491        future.completeExceptionally(
1492          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1493        return;
1494      }
1495      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1496        if (err2 != null) {
1497          future.completeExceptionally(err2);
1498        } else {
1499          future.complete(ret);
1500        }
1501      });
1502    });
1503    return future;
1504  }
1505
1506  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1507    CompletableFuture<Void> future = new CompletableFuture<>();
1508    TableName tableName = hri.getTable();
1509    final SplitTableRegionRequest request;
1510    try {
1511      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1512        ng.newNonce());
1513    } catch (DeserializationException e) {
1514      future.completeExceptionally(e);
1515      return future;
1516    }
1517
1518    addListener(
1519      this.procedureCall(tableName,
1520        request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1521        new SplitTableRegionProcedureBiConsumer(tableName)),
1522      (ret, err2) -> {
1523        if (err2 != null) {
1524          future.completeExceptionally(err2);
1525        } else {
1526          future.complete(ret);
1527        }
1528      });
1529    return future;
1530  }
1531
1532  @Override
1533  public CompletableFuture<Void> assign(byte[] regionName) {
1534    CompletableFuture<Void> future = new CompletableFuture<>();
1535    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1536      if (err != null) {
1537        future.completeExceptionally(err);
1538        return;
1539      }
1540      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1541        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1542          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1543          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1544        .call(), (ret, err2) -> {
1545          if (err2 != null) {
1546            future.completeExceptionally(err2);
1547          } else {
1548            future.complete(ret);
1549          }
1550        });
1551    });
1552    return future;
1553  }
1554
1555  @Override
1556  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1557    CompletableFuture<Void> future = new CompletableFuture<>();
1558    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1559      if (err != null) {
1560        future.completeExceptionally(err);
1561        return;
1562      }
1563      addListener(
1564        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1565          .action(((controller, stub) -> this
1566            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1567              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1568              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1569          .call(),
1570        (ret, err2) -> {
1571          if (err2 != null) {
1572            future.completeExceptionally(err2);
1573          } else {
1574            future.complete(ret);
1575          }
1576        });
1577    });
1578    return future;
1579  }
1580
1581  @Override
1582  public CompletableFuture<Void> offline(byte[] regionName) {
1583    CompletableFuture<Void> future = new CompletableFuture<>();
1584    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1585      if (err != null) {
1586        future.completeExceptionally(err);
1587        return;
1588      }
1589      addListener(
1590        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1591          .action(((controller, stub) -> this
1592            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1593              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1594              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1595          .call(),
1596        (ret, err2) -> {
1597          if (err2 != null) {
1598            future.completeExceptionally(err2);
1599          } else {
1600            future.complete(ret);
1601          }
1602        });
1603    });
1604    return future;
1605  }
1606
1607  @Override
1608  public CompletableFuture<Void> move(byte[] regionName) {
1609    CompletableFuture<Void> future = new CompletableFuture<>();
1610    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1611      if (err != null) {
1612        future.completeExceptionally(err);
1613        return;
1614      }
1615      addListener(
1616        moveRegion(regionInfo,
1617          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1618        (ret, err2) -> {
1619          if (err2 != null) {
1620            future.completeExceptionally(err2);
1621          } else {
1622            future.complete(ret);
1623          }
1624        });
1625    });
1626    return future;
1627  }
1628
1629  @Override
1630  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1631    Preconditions.checkNotNull(destServerName,
1632      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1633    CompletableFuture<Void> future = new CompletableFuture<>();
1634    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1635      if (err != null) {
1636        future.completeExceptionally(err);
1637        return;
1638      }
1639      addListener(
1640        moveRegion(regionInfo, RequestConverter
1641          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1642        (ret, err2) -> {
1643          if (err2 != null) {
1644            future.completeExceptionally(err2);
1645          } else {
1646            future.complete(ret);
1647          }
1648        });
1649    });
1650    return future;
1651  }
1652
1653  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1654    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1655      .action(
1656        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1657          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1658      .call();
1659  }
1660
1661  @Override
1662  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1663    return this
1664        .<Void> newMasterCaller()
1665        .action(
1666          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1667            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1668            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1669  }
1670
1671  @Override
1672  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1673    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1674    Scan scan = QuotaTableUtil.makeScan(filter);
1675    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1676        .scan(scan, new AdvancedScanResultConsumer() {
1677          List<QuotaSettings> settings = new ArrayList<>();
1678
1679          @Override
1680          public void onNext(Result[] results, ScanController controller) {
1681            for (Result result : results) {
1682              try {
1683                QuotaTableUtil.parseResultToCollection(result, settings);
1684              } catch (IOException e) {
1685                controller.terminate();
1686                future.completeExceptionally(e);
1687              }
1688            }
1689          }
1690
1691          @Override
1692          public void onError(Throwable error) {
1693            future.completeExceptionally(error);
1694          }
1695
1696          @Override
1697          public void onComplete() {
1698            future.complete(settings);
1699          }
1700        });
1701    return future;
1702  }
1703
1704  @Override
1705  public CompletableFuture<Void> addReplicationPeer(String peerId,
1706      ReplicationPeerConfig peerConfig, boolean enabled) {
1707    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1708      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1709      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1710      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1711  }
1712
1713  @Override
1714  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1715    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1716      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1717      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1718      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1719  }
1720
1721  @Override
1722  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1723    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1724      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1725      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1726      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1727  }
1728
1729  @Override
1730  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1731    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1732      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1733      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1734      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1735  }
1736
1737  @Override
1738  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1739    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1740      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
1741          call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1742            (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1743            (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1744  }
1745
1746  @Override
1747  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1748      ReplicationPeerConfig peerConfig) {
1749    return this
1750      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1751        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1752        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1753        (resp) -> resp.getProcId(),
1754        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1755  }
1756
1757  @Override
1758  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1759      SyncReplicationState clusterState) {
1760    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1761        TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1762        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1763          clusterState),
1764          (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1765          (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1766            () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1767  }
1768
1769  @Override
1770  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1771      Map<TableName, List<String>> tableCfs) {
1772    if (tableCfs == null) {
1773      return failedFuture(new ReplicationException("tableCfs is null"));
1774    }
1775
1776    CompletableFuture<Void> future = new CompletableFuture<Void>();
1777    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1778      if (!completeExceptionally(future, error)) {
1779        ReplicationPeerConfig newPeerConfig =
1780          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1781        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1782          if (!completeExceptionally(future, error)) {
1783            future.complete(result);
1784          }
1785        });
1786      }
1787    });
1788    return future;
1789  }
1790
1791  @Override
1792  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1793      Map<TableName, List<String>> tableCfs) {
1794    if (tableCfs == null) {
1795      return failedFuture(new ReplicationException("tableCfs is null"));
1796    }
1797
1798    CompletableFuture<Void> future = new CompletableFuture<Void>();
1799    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1800      if (!completeExceptionally(future, error)) {
1801        ReplicationPeerConfig newPeerConfig = null;
1802        try {
1803          newPeerConfig = ReplicationPeerConfigUtil
1804            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1805        } catch (ReplicationException e) {
1806          future.completeExceptionally(e);
1807          return;
1808        }
1809        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1810          if (!completeExceptionally(future, error)) {
1811            future.complete(result);
1812          }
1813        });
1814      }
1815    });
1816    return future;
1817  }
1818
1819  @Override
1820  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1821    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1822  }
1823
1824  @Override
1825  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1826    Preconditions.checkNotNull(pattern,
1827      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1828    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1829  }
1830
1831  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1832      ListReplicationPeersRequest request) {
1833    return this
1834        .<List<ReplicationPeerDescription>> newMasterCaller()
1835        .action(
1836          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1837              List<ReplicationPeerDescription>> call(controller, stub, request,
1838                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1839                (resp) -> resp.getPeerDescList().stream()
1840                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1841                    .collect(Collectors.toList()))).call();
1842  }
1843
1844  @Override
1845  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1846    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1847    addListener(listTableDescriptors(), (tables, error) -> {
1848      if (!completeExceptionally(future, error)) {
1849        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1850        tables.forEach(table -> {
1851          Map<String, Integer> cfs = new HashMap<>();
1852          Stream.of(table.getColumnFamilies())
1853            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1854            .forEach(column -> {
1855              cfs.put(column.getNameAsString(), column.getScope());
1856            });
1857          if (!cfs.isEmpty()) {
1858            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1859          }
1860        });
1861        future.complete(replicatedTableCFs);
1862      }
1863    });
1864    return future;
1865  }
1866
1867  @Override
1868  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1869    SnapshotProtos.SnapshotDescription snapshot =
1870      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1871    try {
1872      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1873    } catch (IllegalArgumentException e) {
1874      return failedFuture(e);
1875    }
1876    CompletableFuture<Void> future = new CompletableFuture<>();
1877    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1878    addListener(this.<Long> newMasterCaller()
1879      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1880        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1881        resp -> resp.getExpectedTimeout()))
1882      .call(), (expectedTimeout, err) -> {
1883        if (err != null) {
1884          future.completeExceptionally(err);
1885          return;
1886        }
1887        TimerTask pollingTask = new TimerTask() {
1888          int tries = 0;
1889          long startTime = EnvironmentEdgeManager.currentTime();
1890          long endTime = startTime + expectedTimeout;
1891          long maxPauseTime = expectedTimeout / maxAttempts;
1892
1893          @Override
1894          public void run(Timeout timeout) throws Exception {
1895            if (EnvironmentEdgeManager.currentTime() < endTime) {
1896              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1897                if (err2 != null) {
1898                  future.completeExceptionally(err2);
1899                } else if (done) {
1900                  future.complete(null);
1901                } else {
1902                  // retry again after pauseTime.
1903                  long pauseTime =
1904                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1905                  pauseTime = Math.min(pauseTime, maxPauseTime);
1906                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1907                    TimeUnit.MILLISECONDS);
1908                }
1909              });
1910            } else {
1911              future.completeExceptionally(
1912                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1913                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1914            }
1915          }
1916        };
1917        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1918      });
1919    return future;
1920  }
1921
1922  @Override
1923  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1924    return this
1925        .<Boolean> newMasterCaller()
1926        .action(
1927          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1928            controller,
1929            stub,
1930            IsSnapshotDoneRequest.newBuilder()
1931                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1932                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1933  }
1934
1935  @Override
1936  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1937    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1938      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1939      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1940    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1941  }
1942
1943  @Override
1944  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1945      boolean restoreAcl) {
1946    CompletableFuture<Void> future = new CompletableFuture<>();
1947    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1948      if (err != null) {
1949        future.completeExceptionally(err);
1950        return;
1951      }
1952      TableName tableName = null;
1953      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1954        for (SnapshotDescription snap : snapshotDescriptions) {
1955          if (snap.getName().equals(snapshotName)) {
1956            tableName = snap.getTableName();
1957            break;
1958          }
1959        }
1960      }
1961      if (tableName == null) {
1962        future.completeExceptionally(new RestoreSnapshotException(
1963          "Unable to find the table name for snapshot=" + snapshotName));
1964        return;
1965      }
1966      final TableName finalTableName = tableName;
1967      addListener(tableExists(finalTableName), (exists, err2) -> {
1968        if (err2 != null) {
1969          future.completeExceptionally(err2);
1970        } else if (!exists) {
1971          // if table does not exist, then just clone snapshot into new table.
1972          completeConditionalOnFuture(future,
1973            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1974        } else {
1975          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1976            if (err4 != null) {
1977              future.completeExceptionally(err4);
1978            } else if (!disabled) {
1979              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1980            } else {
1981              completeConditionalOnFuture(future,
1982                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1983            }
1984          });
1985        }
1986      });
1987    });
1988    return future;
1989  }
1990
1991  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1992      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1993    if (takeFailSafeSnapshot) {
1994      CompletableFuture<Void> future = new CompletableFuture<>();
1995      // Step.1 Take a snapshot of the current state
1996      String failSafeSnapshotSnapshotNameFormat =
1997        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1998          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1999      final String failSafeSnapshotSnapshotName =
2000        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2001          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2002          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2003      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2004      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2005        if (err != null) {
2006          future.completeExceptionally(err);
2007        } else {
2008          // Step.2 Restore snapshot
2009          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
2010            (void2, err2) -> {
2011              if (err2 != null) {
2012                // Step.3.a Something went wrong during the restore and try to rollback.
2013                addListener(
2014                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
2015                  (void3, err3) -> {
2016                    if (err3 != null) {
2017                      future.completeExceptionally(err3);
2018                    } else {
2019                      String msg =
2020                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
2021                          failSafeSnapshotSnapshotName + " succeeded.";
2022                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2023                    }
2024                  });
2025              } else {
2026                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2027                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2028                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2029                  if (err3 != null) {
2030                    LOG.error(
2031                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2032                      err3);
2033                  }
2034                  future.complete(ret3);
2035                });
2036              }
2037            });
2038        }
2039      });
2040      return future;
2041    } else {
2042      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
2043    }
2044  }
2045
2046  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2047      CompletableFuture<T> parentFuture) {
2048    addListener(parentFuture, (res, err) -> {
2049      if (err != null) {
2050        dependentFuture.completeExceptionally(err);
2051      } else {
2052        dependentFuture.complete(res);
2053      }
2054    });
2055  }
2056
2057  @Override
2058  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2059      boolean restoreAcl) {
2060    CompletableFuture<Void> future = new CompletableFuture<>();
2061    addListener(tableExists(tableName), (exists, err) -> {
2062      if (err != null) {
2063        future.completeExceptionally(err);
2064      } else if (exists) {
2065        future.completeExceptionally(new TableExistsException(tableName));
2066      } else {
2067        completeConditionalOnFuture(future,
2068          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2069      }
2070    });
2071    return future;
2072  }
2073
2074  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2075      boolean restoreAcl) {
2076    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2077      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2078    try {
2079      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2080    } catch (IllegalArgumentException e) {
2081      return failedFuture(e);
2082    }
2083    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2084      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2085        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2086          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2087        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2088      .call());
2089  }
2090
2091  @Override
2092  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2093    return getCompletedSnapshots(null);
2094  }
2095
2096  @Override
2097  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2098    Preconditions.checkNotNull(pattern,
2099      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2100    return getCompletedSnapshots(pattern);
2101  }
2102
2103  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2104    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2105        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2106        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2107          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2108          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2109        .call();
2110  }
2111
2112  @Override
2113  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2114    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2115        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2116    return getCompletedSnapshots(tableNamePattern, null);
2117  }
2118
2119  @Override
2120  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2121      Pattern snapshotNamePattern) {
2122    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2123        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2124    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2125        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2126    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2127  }
2128
2129  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2130      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2131    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2132    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2133      if (err != null) {
2134        future.completeExceptionally(err);
2135        return;
2136      }
2137      if (tableNames == null || tableNames.size() <= 0) {
2138        future.complete(Collections.emptyList());
2139        return;
2140      }
2141      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2142        if (err2 != null) {
2143          future.completeExceptionally(err2);
2144          return;
2145        }
2146        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2147          future.complete(Collections.emptyList());
2148          return;
2149        }
2150        future.complete(snapshotDescList.stream()
2151          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2152          .collect(Collectors.toList()));
2153      });
2154    });
2155    return future;
2156  }
2157
2158  @Override
2159  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2160    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2161  }
2162
2163  @Override
2164  public CompletableFuture<Void> deleteSnapshots() {
2165    return internalDeleteSnapshots(null, null);
2166  }
2167
2168  @Override
2169  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2170    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2171        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2172    return internalDeleteSnapshots(null, snapshotNamePattern);
2173  }
2174
2175  @Override
2176  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2177    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2178        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2179    return internalDeleteSnapshots(tableNamePattern, null);
2180  }
2181
2182  @Override
2183  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2184      Pattern snapshotNamePattern) {
2185    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2186        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2187    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2188        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2189    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2190  }
2191
2192  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2193      Pattern snapshotNamePattern) {
2194    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2195    if (tableNamePattern == null) {
2196      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2197    } else {
2198      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2199    }
2200    CompletableFuture<Void> future = new CompletableFuture<>();
2201    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2202      if (err != null) {
2203        future.completeExceptionally(err);
2204        return;
2205      }
2206      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2207        future.complete(null);
2208        return;
2209      }
2210      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2211        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2212          if (e != null) {
2213            future.completeExceptionally(e);
2214          } else {
2215            future.complete(v);
2216          }
2217        });
2218    }));
2219    return future;
2220  }
2221
2222  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2223    return this
2224        .<Void> newMasterCaller()
2225        .action(
2226          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2227            controller,
2228            stub,
2229            DeleteSnapshotRequest.newBuilder()
2230                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2231                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2232  }
2233
2234  @Override
2235  public CompletableFuture<Void> execProcedure(String signature, String instance,
2236      Map<String, String> props) {
2237    CompletableFuture<Void> future = new CompletableFuture<>();
2238    ProcedureDescription procDesc =
2239      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2240    addListener(this.<Long> newMasterCaller()
2241      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2242        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2243        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2244      .call(), (expectedTimeout, err) -> {
2245        if (err != null) {
2246          future.completeExceptionally(err);
2247          return;
2248        }
2249        TimerTask pollingTask = new TimerTask() {
2250          int tries = 0;
2251          long startTime = EnvironmentEdgeManager.currentTime();
2252          long endTime = startTime + expectedTimeout;
2253          long maxPauseTime = expectedTimeout / maxAttempts;
2254
2255          @Override
2256          public void run(Timeout timeout) throws Exception {
2257            if (EnvironmentEdgeManager.currentTime() < endTime) {
2258              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2259                if (err2 != null) {
2260                  future.completeExceptionally(err2);
2261                  return;
2262                }
2263                if (done) {
2264                  future.complete(null);
2265                } else {
2266                  // retry again after pauseTime.
2267                  long pauseTime =
2268                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2269                  pauseTime = Math.min(pauseTime, maxPauseTime);
2270                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2271                    TimeUnit.MICROSECONDS);
2272                }
2273              });
2274            } else {
2275              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2276                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2277            }
2278          }
2279        };
2280        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2281        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2282      });
2283    return future;
2284  }
2285
2286  @Override
2287  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2288      Map<String, String> props) {
2289    ProcedureDescription proDesc =
2290        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2291    return this.<byte[]> newMasterCaller()
2292        .action(
2293          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2294            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2295            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2296            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2297        .call();
2298  }
2299
2300  @Override
2301  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2302      Map<String, String> props) {
2303    ProcedureDescription proDesc =
2304        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2305    return this.<Boolean> newMasterCaller()
2306        .action((controller, stub) -> this
2307            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2308              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2309              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2310        .call();
2311  }
2312
2313  @Override
2314  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2315    return this.<Boolean> newMasterCaller().action(
2316      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2317        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2318        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2319        .call();
2320  }
2321
2322  @Override
2323  public CompletableFuture<String> getProcedures() {
2324    return this
2325        .<String> newMasterCaller()
2326        .action(
2327          (controller, stub) -> this
2328              .<GetProceduresRequest, GetProceduresResponse, String> call(
2329                controller, stub, GetProceduresRequest.newBuilder().build(),
2330                (s, c, req, done) -> s.getProcedures(c, req, done),
2331                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2332  }
2333
2334  @Override
2335  public CompletableFuture<String> getLocks() {
2336    return this
2337        .<String> newMasterCaller()
2338        .action(
2339          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2340            controller, stub, GetLocksRequest.newBuilder().build(),
2341            (s, c, req, done) -> s.getLocks(c, req, done),
2342            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2343  }
2344
2345  @Override
2346  public CompletableFuture<Void> decommissionRegionServers(
2347      List<ServerName> servers, boolean offload) {
2348    return this.<Void> newMasterCaller()
2349        .action((controller, stub) -> this
2350          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2351            controller, stub,
2352              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2353            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2354        .call();
2355  }
2356
2357  @Override
2358  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2359    return this.<List<ServerName>> newMasterCaller()
2360        .action((controller, stub) -> this
2361          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2362            List<ServerName>> call(
2363              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2364              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2365              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2366                  .collect(Collectors.toList())))
2367        .call();
2368  }
2369
2370  @Override
2371  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2372      List<byte[]> encodedRegionNames) {
2373    return this.<Void> newMasterCaller()
2374        .action((controller, stub) ->
2375            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2376                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2377                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2378                        c, req, done), resp -> null)).call();
2379  }
2380
2381  /**
2382   * Get the region location for the passed region name. The region name may be a full region name
2383   * or encoded region name. If the region does not found, then it'll throw an
2384   * UnknownRegionException wrapped by a {@link CompletableFuture}
2385   * @param regionNameOrEncodedRegionName region name or encoded region name
2386   * @return region location, wrapped by a {@link CompletableFuture}
2387   */
2388  @VisibleForTesting
2389  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2390    if (regionNameOrEncodedRegionName == null) {
2391      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2392    }
2393    try {
2394      CompletableFuture<Optional<HRegionLocation>> future;
2395      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2396        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2397        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2398          // old format encodedName, should be meta region
2399          future = connection.registry.getMetaRegionLocations()
2400            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2401              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2402        } else {
2403          future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2404            regionNameOrEncodedRegionName);
2405        }
2406      } else {
2407        RegionInfo regionInfo =
2408          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2409        if (regionInfo.isMetaRegion()) {
2410          future = connection.registry.getMetaRegionLocations()
2411            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2412              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2413              .findFirst());
2414        } else {
2415          future =
2416            ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2417        }
2418      }
2419
2420      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2421      addListener(future, (location, err) -> {
2422        if (err != null) {
2423          returnedFuture.completeExceptionally(err);
2424          return;
2425        }
2426        if (!location.isPresent() || location.get().getRegion() == null) {
2427          returnedFuture.completeExceptionally(
2428            new UnknownRegionException("Invalid region name or encoded region name: " +
2429              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2430        } else {
2431          returnedFuture.complete(location.get());
2432        }
2433      });
2434      return returnedFuture;
2435    } catch (IOException e) {
2436      return failedFuture(e);
2437    }
2438  }
2439
2440  /**
2441   * Get the region info for the passed region name. The region name may be a full region name or
2442   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2443   * wrapped by a {@link CompletableFuture}
2444   * @return region info, wrapped by a {@link CompletableFuture}
2445   */
2446  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2447    if (regionNameOrEncodedRegionName == null) {
2448      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2449    }
2450
2451    if (Bytes.equals(regionNameOrEncodedRegionName,
2452      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2453      Bytes.equals(regionNameOrEncodedRegionName,
2454        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2455      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2456    }
2457
2458    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2459    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2460      if (err != null) {
2461        future.completeExceptionally(err);
2462      } else {
2463        future.complete(location.getRegion());
2464      }
2465    });
2466    return future;
2467  }
2468
2469  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2470    if (numRegions < 3) {
2471      throw new IllegalArgumentException("Must create at least three regions");
2472    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2473      throw new IllegalArgumentException("Start key must be smaller than end key");
2474    }
2475    if (numRegions == 3) {
2476      return new byte[][] { startKey, endKey };
2477    }
2478    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2479    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2480      throw new IllegalArgumentException("Unable to split key range into enough regions");
2481    }
2482    return splitKeys;
2483  }
2484
2485  private void verifySplitKeys(byte[][] splitKeys) {
2486    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2487    // Verify there are no duplicate split keys
2488    byte[] lastKey = null;
2489    for (byte[] splitKey : splitKeys) {
2490      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2491        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2492      }
2493      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2494        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2495            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2496      }
2497      lastKey = splitKey;
2498    }
2499  }
2500
2501  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2502
2503    abstract void onFinished();
2504
2505    abstract void onError(Throwable error);
2506
2507    @Override
2508    public void accept(Void v, Throwable error) {
2509      if (error != null) {
2510        onError(error);
2511        return;
2512      }
2513      onFinished();
2514    }
2515  }
2516
2517  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2518    protected final TableName tableName;
2519
2520    TableProcedureBiConsumer(TableName tableName) {
2521      this.tableName = tableName;
2522    }
2523
2524    abstract String getOperationType();
2525
2526    String getDescription() {
2527      return "Operation: " + getOperationType() + ", " + "Table Name: "
2528          + tableName.getNameWithNamespaceInclAsString();
2529    }
2530
2531    @Override
2532    void onFinished() {
2533      LOG.info(getDescription() + " completed");
2534    }
2535
2536    @Override
2537    void onError(Throwable error) {
2538      LOG.info(getDescription() + " failed with " + error.getMessage());
2539    }
2540  }
2541
2542  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2543    protected final String namespaceName;
2544
2545    NamespaceProcedureBiConsumer(String namespaceName) {
2546      this.namespaceName = namespaceName;
2547    }
2548
2549    abstract String getOperationType();
2550
2551    String getDescription() {
2552      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2553    }
2554
2555    @Override
2556    void onFinished() {
2557      LOG.info(getDescription() + " completed");
2558    }
2559
2560    @Override
2561    void onError(Throwable error) {
2562      LOG.info(getDescription() + " failed with " + error.getMessage());
2563    }
2564  }
2565
2566  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2567
2568    CreateTableProcedureBiConsumer(TableName tableName) {
2569      super(tableName);
2570    }
2571
2572    @Override
2573    String getOperationType() {
2574      return "CREATE";
2575    }
2576  }
2577
2578  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2579
2580    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2581      super(tableName);
2582    }
2583
2584    @Override
2585    String getOperationType() {
2586      return "ENABLE";
2587    }
2588  }
2589
2590  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2591
2592    DeleteTableProcedureBiConsumer(TableName tableName) {
2593      super(tableName);
2594    }
2595
2596    @Override
2597    String getOperationType() {
2598      return "DELETE";
2599    }
2600
2601    @Override
2602    void onFinished() {
2603      connection.getLocator().clearCache(this.tableName);
2604      super.onFinished();
2605    }
2606  }
2607
2608  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2609
2610    TruncateTableProcedureBiConsumer(TableName tableName) {
2611      super(tableName);
2612    }
2613
2614    @Override
2615    String getOperationType() {
2616      return "TRUNCATE";
2617    }
2618  }
2619
2620  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2621
2622    EnableTableProcedureBiConsumer(TableName tableName) {
2623      super(tableName);
2624    }
2625
2626    @Override
2627    String getOperationType() {
2628      return "ENABLE";
2629    }
2630  }
2631
2632  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2633
2634    DisableTableProcedureBiConsumer(TableName tableName) {
2635      super(tableName);
2636    }
2637
2638    @Override
2639    String getOperationType() {
2640      return "DISABLE";
2641    }
2642  }
2643
2644  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2645
2646    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2647      super(tableName);
2648    }
2649
2650    @Override
2651    String getOperationType() {
2652      return "ADD_COLUMN_FAMILY";
2653    }
2654  }
2655
2656  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2657
2658    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2659      super(tableName);
2660    }
2661
2662    @Override
2663    String getOperationType() {
2664      return "DELETE_COLUMN_FAMILY";
2665    }
2666  }
2667
2668  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2669
2670    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2671      super(tableName);
2672    }
2673
2674    @Override
2675    String getOperationType() {
2676      return "MODIFY_COLUMN_FAMILY";
2677    }
2678  }
2679
2680  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2681
2682    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2683      super(namespaceName);
2684    }
2685
2686    @Override
2687    String getOperationType() {
2688      return "CREATE_NAMESPACE";
2689    }
2690  }
2691
2692  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2693
2694    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2695      super(namespaceName);
2696    }
2697
2698    @Override
2699    String getOperationType() {
2700      return "DELETE_NAMESPACE";
2701    }
2702  }
2703
2704  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2705
2706    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2707      super(namespaceName);
2708    }
2709
2710    @Override
2711    String getOperationType() {
2712      return "MODIFY_NAMESPACE";
2713    }
2714  }
2715
2716  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2717
2718    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2719      super(tableName);
2720    }
2721
2722    @Override
2723    String getOperationType() {
2724      return "MERGE_REGIONS";
2725    }
2726  }
2727
2728  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2729
2730    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2731      super(tableName);
2732    }
2733
2734    @Override
2735    String getOperationType() {
2736      return "SPLIT_REGION";
2737    }
2738  }
2739
2740  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2741    private final String peerId;
2742    private final Supplier<String> getOperation;
2743
2744    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2745      this.peerId = peerId;
2746      this.getOperation = getOperation;
2747    }
2748
2749    String getDescription() {
2750      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2751    }
2752
2753    @Override
2754    void onFinished() {
2755      LOG.info(getDescription() + " completed");
2756    }
2757
2758    @Override
2759    void onError(Throwable error) {
2760      LOG.info(getDescription() + " failed with " + error.getMessage());
2761    }
2762  }
2763
2764  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2765    CompletableFuture<Void> future = new CompletableFuture<>();
2766    addListener(procFuture, (procId, error) -> {
2767      if (error != null) {
2768        future.completeExceptionally(error);
2769        return;
2770      }
2771      getProcedureResult(procId, future, 0);
2772    });
2773    return future;
2774  }
2775
2776  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2777    addListener(
2778      this.<GetProcedureResultResponse> newMasterCaller()
2779        .action((controller, stub) -> this
2780          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2781            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2782            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2783        .call(),
2784      (response, error) -> {
2785        if (error != null) {
2786          LOG.warn("failed to get the procedure result procId={}", procId,
2787            ConnectionUtils.translateException(error));
2788          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2789            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2790          return;
2791        }
2792        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2793          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2794            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2795          return;
2796        }
2797        if (response.hasException()) {
2798          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2799          future.completeExceptionally(ioe);
2800        } else {
2801          future.complete(null);
2802        }
2803      });
2804  }
2805
2806  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2807    CompletableFuture<T> future = new CompletableFuture<>();
2808    future.completeExceptionally(error);
2809    return future;
2810  }
2811
2812  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2813    if (error != null) {
2814      future.completeExceptionally(error);
2815      return true;
2816    }
2817    return false;
2818  }
2819
2820  @Override
2821  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2822    return getClusterMetrics(EnumSet.allOf(Option.class));
2823  }
2824
2825  @Override
2826  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2827    return this
2828        .<ClusterMetrics> newMasterCaller()
2829        .action(
2830          (controller, stub) -> this
2831              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2832                stub, RequestConverter.buildGetClusterStatusRequest(options),
2833                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2834                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2835  }
2836
2837  @Override
2838  public CompletableFuture<Void> shutdown() {
2839    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2840      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2841        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2842        resp -> null))
2843      .call();
2844  }
2845
2846  @Override
2847  public CompletableFuture<Void> stopMaster() {
2848    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2849      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2850        controller, stub, StopMasterRequest.newBuilder().build(),
2851        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2852      .call();
2853  }
2854
2855  @Override
2856  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2857    StopServerRequest request = RequestConverter
2858      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2859    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2860      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2861        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2862        resp -> null))
2863      .serverName(serverName).call();
2864  }
2865
2866  @Override
2867  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2868    return this
2869        .<Void> newAdminCaller()
2870        .action(
2871          (controller, stub) -> this
2872              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2873                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2874                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2875        .serverName(serverName).call();
2876  }
2877
2878  @Override
2879  public CompletableFuture<Void> updateConfiguration() {
2880    CompletableFuture<Void> future = new CompletableFuture<Void>();
2881    addListener(
2882      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2883      (status, err) -> {
2884        if (err != null) {
2885          future.completeExceptionally(err);
2886        } else {
2887          List<CompletableFuture<Void>> futures = new ArrayList<>();
2888          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2889          futures.add(updateConfiguration(status.getMasterName()));
2890          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2891          addListener(
2892            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2893            (result, err2) -> {
2894              if (err2 != null) {
2895                future.completeExceptionally(err2);
2896              } else {
2897                future.complete(result);
2898              }
2899            });
2900        }
2901      });
2902    return future;
2903  }
2904
2905  @Override
2906  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2907    return this
2908        .<Void> newAdminCaller()
2909        .action(
2910          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2911            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2912            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2913        .serverName(serverName).call();
2914  }
2915
2916  @Override
2917  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2918    return this
2919        .<Void> newAdminCaller()
2920        .action(
2921          (controller, stub) -> this
2922              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2923                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2924                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2925        .serverName(serverName).call();
2926  }
2927
2928  @Override
2929  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2930    return this
2931        .<List<SecurityCapability>> newMasterCaller()
2932        .action(
2933          (controller, stub) -> this
2934              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2935                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2936                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2937                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2938        .call();
2939  }
2940
2941  @Override
2942  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2943    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2944  }
2945
2946  @Override
2947  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2948      TableName tableName) {
2949    Preconditions.checkNotNull(tableName,
2950      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2951    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2952  }
2953
2954  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2955      ServerName serverName) {
2956    return this.<List<RegionMetrics>> newAdminCaller()
2957        .action((controller, stub) -> this
2958            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2959              adminCall(controller, stub, request, (s, c, req, done) ->
2960                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2961        .serverName(serverName).call();
2962  }
2963
2964  @Override
2965  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2966    return this
2967        .<Boolean> newMasterCaller()
2968        .action(
2969          (controller, stub) -> this
2970              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2971                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2972                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2973                resp -> resp.getInMaintenanceMode())).call();
2974  }
2975
2976  @Override
2977  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2978      CompactType compactType) {
2979    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2980
2981    switch (compactType) {
2982      case MOB:
2983        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
2984          if (err != null) {
2985            future.completeExceptionally(err);
2986            return;
2987          }
2988          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2989
2990          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2991            .action((controller, stub) -> this
2992              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2993                controller, stub,
2994                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2995                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2996            .call(), (resp2, err2) -> {
2997              if (err2 != null) {
2998                future.completeExceptionally(err2);
2999              } else {
3000                if (resp2.hasCompactionState()) {
3001                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3002                } else {
3003                  future.complete(CompactionState.NONE);
3004                }
3005              }
3006            });
3007        });
3008        break;
3009      case NORMAL:
3010        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3011          if (err != null) {
3012            future.completeExceptionally(err);
3013            return;
3014          }
3015          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3016          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3017          locations.stream().filter(loc -> loc.getServerName() != null)
3018            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3019            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3020              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3021                // If any region compaction state is MAJOR_AND_MINOR
3022                // the table compaction state is MAJOR_AND_MINOR, too.
3023                if (err2 != null) {
3024                  future.completeExceptionally(unwrapCompletionException(err2));
3025                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3026                  future.complete(regionState);
3027                } else {
3028                  regionStates.add(regionState);
3029                }
3030              }));
3031            });
3032          addListener(
3033            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3034            (ret, err3) -> {
3035              // If future not completed, check all regions's compaction state
3036              if (!future.isCompletedExceptionally() && !future.isDone()) {
3037                CompactionState state = CompactionState.NONE;
3038                for (CompactionState regionState : regionStates) {
3039                  switch (regionState) {
3040                    case MAJOR:
3041                      if (state == CompactionState.MINOR) {
3042                        future.complete(CompactionState.MAJOR_AND_MINOR);
3043                      } else {
3044                        state = CompactionState.MAJOR;
3045                      }
3046                      break;
3047                    case MINOR:
3048                      if (state == CompactionState.MAJOR) {
3049                        future.complete(CompactionState.MAJOR_AND_MINOR);
3050                      } else {
3051                        state = CompactionState.MINOR;
3052                      }
3053                      break;
3054                    case NONE:
3055                    default:
3056                  }
3057                }
3058                if (!future.isDone()) {
3059                  future.complete(state);
3060                }
3061              }
3062            });
3063        });
3064        break;
3065      default:
3066        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3067    }
3068
3069    return future;
3070  }
3071
3072  @Override
3073  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3074    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3075    addListener(getRegionLocation(regionName), (location, err) -> {
3076      if (err != null) {
3077        future.completeExceptionally(err);
3078        return;
3079      }
3080      ServerName serverName = location.getServerName();
3081      if (serverName == null) {
3082        future
3083          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3084        return;
3085      }
3086      addListener(
3087        this.<GetRegionInfoResponse> newAdminCaller()
3088          .action((controller, stub) -> this
3089            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3090              controller, stub,
3091              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3092                true),
3093              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3094          .serverName(serverName).call(),
3095        (resp2, err2) -> {
3096          if (err2 != null) {
3097            future.completeExceptionally(err2);
3098          } else {
3099            if (resp2.hasCompactionState()) {
3100              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3101            } else {
3102              future.complete(CompactionState.NONE);
3103            }
3104          }
3105        });
3106    });
3107    return future;
3108  }
3109
3110  @Override
3111  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3112    MajorCompactionTimestampRequest request =
3113        MajorCompactionTimestampRequest.newBuilder()
3114            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3115    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3116        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3117            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3118                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3119  }
3120
3121  @Override
3122  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3123      byte[] regionName) {
3124    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3125    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3126    addListener(getRegionInfo(regionName), (region, err) -> {
3127      if (err != null) {
3128        future.completeExceptionally(err);
3129        return;
3130      }
3131      MajorCompactionTimestampForRegionRequest.Builder builder =
3132        MajorCompactionTimestampForRegionRequest.newBuilder();
3133      builder.setRegion(
3134        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3135      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3136        .<MajorCompactionTimestampForRegionRequest,
3137        MajorCompactionTimestampResponse, Optional<Long>> call(
3138          controller, stub, builder.build(),
3139          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3140          ProtobufUtil::toOptionalTimestamp))
3141        .call(), (timestamp, err2) -> {
3142          if (err2 != null) {
3143            future.completeExceptionally(err2);
3144          } else {
3145            future.complete(timestamp);
3146          }
3147        });
3148    });
3149    return future;
3150  }
3151
3152  @Override
3153  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3154      List<String> serverNamesList) {
3155    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3156    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3157      if (err != null) {
3158        future.completeExceptionally(err);
3159        return;
3160      }
3161      // Accessed by multiple threads.
3162      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3163      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3164      serverNames.stream().forEach(serverName -> {
3165        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3166          if (err2 != null) {
3167            future.completeExceptionally(unwrapCompletionException(err2));
3168          } else {
3169            serverStates.put(serverName, serverState);
3170          }
3171        }));
3172      });
3173      addListener(
3174        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3175        (ret, err3) -> {
3176          if (!future.isCompletedExceptionally()) {
3177            if (err3 != null) {
3178              future.completeExceptionally(err3);
3179            } else {
3180              future.complete(serverStates);
3181            }
3182          }
3183        });
3184    });
3185    return future;
3186  }
3187
3188  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3189    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3190    if (serverNamesList.isEmpty()) {
3191      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3192        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3193      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3194        if (err != null) {
3195          future.completeExceptionally(err);
3196        } else {
3197          future.complete(clusterMetrics.getServersName());
3198        }
3199      });
3200      return future;
3201    } else {
3202      List<ServerName> serverList = new ArrayList<>();
3203      for (String regionServerName : serverNamesList) {
3204        ServerName serverName = null;
3205        try {
3206          serverName = ServerName.valueOf(regionServerName);
3207        } catch (Exception e) {
3208          future.completeExceptionally(
3209            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3210        }
3211        if (serverName == null) {
3212          future.completeExceptionally(
3213            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3214        } else {
3215          serverList.add(serverName);
3216        }
3217      }
3218      future.complete(serverList);
3219    }
3220    return future;
3221  }
3222
3223  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3224    return this
3225        .<Boolean>newAdminCaller()
3226        .serverName(serverName)
3227        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3228            Boolean>adminCall(controller, stub,
3229            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3230            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3231  }
3232
3233  @Override
3234  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3235    return this.<Boolean> newMasterCaller()
3236      .action((controller, stub) -> this
3237        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3238          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3239          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3240          (resp) -> resp.getPrevBalanceValue()))
3241      .call();
3242  }
3243
3244  @Override
3245  public CompletableFuture<Boolean> balance(boolean forcible) {
3246    return this
3247        .<Boolean> newMasterCaller()
3248        .action(
3249          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3250            stub, RequestConverter.buildBalanceRequest(forcible),
3251            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3252  }
3253
3254  @Override
3255  public CompletableFuture<Boolean> isBalancerEnabled() {
3256    return this
3257        .<Boolean> newMasterCaller()
3258        .action((controller, stub) ->
3259              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3260            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3261                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3262  }
3263
3264  @Override
3265  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3266    return this
3267        .<Boolean> newMasterCaller()
3268        .action(
3269          (controller, stub) -> this
3270              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3271                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3272                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3273                    .getPrevNormalizerValue())).call();
3274  }
3275
3276  @Override
3277  public CompletableFuture<Boolean> isNormalizerEnabled() {
3278    return this
3279        .<Boolean> newMasterCaller()
3280        .action(
3281          (controller, stub) -> this
3282              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3283                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3284                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3285                (resp) -> resp.getEnabled())).call();
3286  }
3287
3288  @Override
3289  public CompletableFuture<Boolean> normalize() {
3290    return this
3291        .<Boolean> newMasterCaller()
3292        .action(
3293          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3294            controller, stub, RequestConverter.buildNormalizeRequest(),
3295            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3296        .call();
3297  }
3298
3299  @Override
3300  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3301    return this
3302        .<Boolean> newMasterCaller()
3303        .action(
3304          (controller, stub) -> this
3305              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3306                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3307                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3308                    .getPrevValue())).call();
3309  }
3310
3311  @Override
3312  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3313    return this
3314        .<Boolean> newMasterCaller()
3315        .action(
3316          (controller, stub) -> this
3317              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3318                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3319                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3320        .call();
3321  }
3322
3323  @Override
3324  public CompletableFuture<Boolean> runCleanerChore() {
3325    return this
3326        .<Boolean> newMasterCaller()
3327        .action(
3328          (controller, stub) -> this
3329              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3330                RequestConverter.buildRunCleanerChoreRequest(),
3331                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3332                (resp) -> resp.getCleanerChoreRan())).call();
3333  }
3334
3335  @Override
3336  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3337    return this
3338        .<Boolean> newMasterCaller()
3339        .action(
3340          (controller, stub) -> this
3341              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3342                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3343                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3344                    .getPrevValue())).call();
3345  }
3346
3347  @Override
3348  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3349    return this
3350        .<Boolean> newMasterCaller()
3351        .action(
3352          (controller, stub) -> this
3353              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3354                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3355                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3356                    .getValue())).call();
3357  }
3358
3359  @Override
3360  public CompletableFuture<Integer> runCatalogJanitor() {
3361    return this
3362        .<Integer> newMasterCaller()
3363        .action(
3364          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3365            controller, stub, RequestConverter.buildCatalogScanRequest(),
3366            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3367        .call();
3368  }
3369
3370  @Override
3371  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3372      ServiceCaller<S, R> callable) {
3373    MasterCoprocessorRpcChannelImpl channel =
3374        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3375    S stub = stubMaker.apply(channel);
3376    CompletableFuture<R> future = new CompletableFuture<>();
3377    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3378    callable.call(stub, controller, resp -> {
3379      if (controller.failed()) {
3380        future.completeExceptionally(controller.getFailed());
3381      } else {
3382        future.complete(resp);
3383      }
3384    });
3385    return future;
3386  }
3387
3388  @Override
3389  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3390      ServiceCaller<S, R> callable, ServerName serverName) {
3391    RegionServerCoprocessorRpcChannelImpl channel =
3392        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3393          serverName));
3394    S stub = stubMaker.apply(channel);
3395    CompletableFuture<R> future = new CompletableFuture<>();
3396    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3397    callable.call(stub, controller, resp -> {
3398      if (controller.failed()) {
3399        future.completeExceptionally(controller.getFailed());
3400      } else {
3401        future.complete(resp);
3402      }
3403    });
3404    return future;
3405  }
3406
3407  @Override
3408  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3409    return this.<List<ServerName>> newMasterCaller()
3410      .action((controller, stub) -> this
3411        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3412          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3413          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3414          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3415      .call();
3416  }
3417
3418  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3419    return this.connection.callerFactory.<T> serverRequest()
3420      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3421      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3422      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3423      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3424  }
3425
3426  @Override
3427  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3428    if (tableName == null) {
3429      return failedFuture(new IllegalArgumentException("Table name is null"));
3430    }
3431    CompletableFuture<Void> future = new CompletableFuture<>();
3432    addListener(tableExists(tableName), (exist, err) -> {
3433      if (err != null) {
3434        future.completeExceptionally(err);
3435        return;
3436      }
3437      if (!exist) {
3438        future.completeExceptionally(new TableNotFoundException(
3439          "Table '" + tableName.getNameAsString() + "' does not exists."));
3440        return;
3441      }
3442      addListener(getTableSplits(tableName), (splits, err1) -> {
3443        if (err1 != null) {
3444          future.completeExceptionally(err1);
3445        } else {
3446          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3447            if (err2 != null) {
3448              future.completeExceptionally(err2);
3449            } else {
3450              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3451                if (err3 != null) {
3452                  future.completeExceptionally(err3);
3453                } else {
3454                  future.complete(result3);
3455                }
3456              });
3457            }
3458          });
3459        }
3460      });
3461    });
3462    return future;
3463  }
3464
3465  @Override
3466  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3467    if (tableName == null) {
3468      return failedFuture(new IllegalArgumentException("Table name is null"));
3469    }
3470    CompletableFuture<Void> future = new CompletableFuture<>();
3471    addListener(tableExists(tableName), (exist, err) -> {
3472      if (err != null) {
3473        future.completeExceptionally(err);
3474        return;
3475      }
3476      if (!exist) {
3477        future.completeExceptionally(new TableNotFoundException(
3478          "Table '" + tableName.getNameAsString() + "' does not exists."));
3479        return;
3480      }
3481      addListener(setTableReplication(tableName, false), (result, err2) -> {
3482        if (err2 != null) {
3483          future.completeExceptionally(err2);
3484        } else {
3485          future.complete(result);
3486        }
3487      });
3488    });
3489    return future;
3490  }
3491
3492  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3493    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3494    addListener(
3495      getRegions(tableName).thenApply(regions -> regions.stream()
3496        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3497      (regions, err2) -> {
3498        if (err2 != null) {
3499          future.completeExceptionally(err2);
3500          return;
3501        }
3502        if (regions.size() == 1) {
3503          future.complete(null);
3504        } else {
3505          byte[][] splits = new byte[regions.size() - 1][];
3506          for (int i = 1; i < regions.size(); i++) {
3507            splits[i - 1] = regions.get(i).getStartKey();
3508          }
3509          future.complete(splits);
3510        }
3511      });
3512    return future;
3513  }
3514
3515  /**
3516   * Connect to peer and check the table descriptor on peer:
3517   * <ol>
3518   * <li>Create the same table on peer when not exist.</li>
3519   * <li>Throw an exception if the table already has replication enabled on any of the column
3520   * families.</li>
3521   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3522   * </ol>
3523   * @param tableName name of the table to sync to the peer
3524   * @param splits table split keys
3525   */
3526  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3527      byte[][] splits) {
3528    CompletableFuture<Void> future = new CompletableFuture<>();
3529    addListener(listReplicationPeers(), (peers, err) -> {
3530      if (err != null) {
3531        future.completeExceptionally(err);
3532        return;
3533      }
3534      if (peers == null || peers.size() <= 0) {
3535        future.completeExceptionally(
3536          new IllegalArgumentException("Found no peer cluster for replication."));
3537        return;
3538      }
3539      List<CompletableFuture<Void>> futures = new ArrayList<>();
3540      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3541        .forEach(peer -> {
3542          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3543        });
3544      addListener(
3545        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3546        (result, err2) -> {
3547          if (err2 != null) {
3548            future.completeExceptionally(err2);
3549          } else {
3550            future.complete(result);
3551          }
3552        });
3553    });
3554    return future;
3555  }
3556
3557  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3558      ReplicationPeerDescription peer) {
3559    Configuration peerConf = null;
3560    try {
3561      peerConf =
3562        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3563    } catch (IOException e) {
3564      return failedFuture(e);
3565    }
3566    CompletableFuture<Void> future = new CompletableFuture<>();
3567    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3568      if (err != null) {
3569        future.completeExceptionally(err);
3570        return;
3571      }
3572      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3573        if (err1 != null) {
3574          future.completeExceptionally(err1);
3575          return;
3576        }
3577        AsyncAdmin peerAdmin = conn.getAdmin();
3578        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3579          if (err2 != null) {
3580            future.completeExceptionally(err2);
3581            return;
3582          }
3583          if (!exist) {
3584            CompletableFuture<Void> createTableFuture = null;
3585            if (splits == null) {
3586              createTableFuture = peerAdmin.createTable(tableDesc);
3587            } else {
3588              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3589            }
3590            addListener(createTableFuture, (result, err3) -> {
3591              if (err3 != null) {
3592                future.completeExceptionally(err3);
3593              } else {
3594                future.complete(result);
3595              }
3596            });
3597          } else {
3598            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3599              (result, err4) -> {
3600                if (err4 != null) {
3601                  future.completeExceptionally(err4);
3602                } else {
3603                  future.complete(result);
3604                }
3605              });
3606          }
3607        });
3608      });
3609    });
3610    return future;
3611  }
3612
3613  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3614      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3615    CompletableFuture<Void> future = new CompletableFuture<>();
3616    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3617      if (err != null) {
3618        future.completeExceptionally(err);
3619        return;
3620      }
3621      if (peerTableDesc == null) {
3622        future.completeExceptionally(
3623          new IllegalArgumentException("Failed to get table descriptor for table " +
3624            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3625        return;
3626      }
3627      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3628        future.completeExceptionally(new IllegalArgumentException(
3629          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3630            ", but the table descriptors are not same when compared with source cluster." +
3631            " Thus can not enable the table's replication switch."));
3632        return;
3633      }
3634      future.complete(null);
3635    });
3636    return future;
3637  }
3638
3639  /**
3640   * Set the table's replication switch if the table's replication switch is already not set.
3641   * @param tableName name of the table
3642   * @param enableRep is replication switch enable or disable
3643   */
3644  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3645    CompletableFuture<Void> future = new CompletableFuture<>();
3646    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3647      if (err != null) {
3648        future.completeExceptionally(err);
3649        return;
3650      }
3651      if (!tableDesc.matchReplicationScope(enableRep)) {
3652        int scope =
3653          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3654        TableDescriptor newTableDesc =
3655          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3656        addListener(modifyTable(newTableDesc), (result, err2) -> {
3657          if (err2 != null) {
3658            future.completeExceptionally(err2);
3659          } else {
3660            future.complete(result);
3661          }
3662        });
3663      } else {
3664        future.complete(null);
3665      }
3666    });
3667    return future;
3668  }
3669
3670  @Override
3671  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3672    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3673    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3674      if (err != null) {
3675        future.completeExceptionally(err);
3676        return;
3677      }
3678      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3679        locations.stream().filter(l -> l.getRegion() != null)
3680          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3681          .collect(Collectors.groupingBy(l -> l.getServerName(),
3682            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3683      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3684      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3685      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3686        futures
3687          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3688            if (err2 != null) {
3689              future.completeExceptionally(unwrapCompletionException(err2));
3690            } else {
3691              aggregator.append(stats);
3692            }
3693          }));
3694      }
3695      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3696        (ret, err3) -> {
3697          if (err3 != null) {
3698            future.completeExceptionally(unwrapCompletionException(err3));
3699          } else {
3700            future.complete(aggregator.sum());
3701          }
3702        });
3703    });
3704    return future;
3705  }
3706
3707  @Override
3708  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3709      boolean preserveSplits) {
3710    CompletableFuture<Void> future = new CompletableFuture<>();
3711    addListener(tableExists(tableName), (exist, err) -> {
3712      if (err != null) {
3713        future.completeExceptionally(err);
3714        return;
3715      }
3716      if (!exist) {
3717        future.completeExceptionally(new TableNotFoundException(tableName));
3718        return;
3719      }
3720      addListener(tableExists(newTableName), (exist1, err1) -> {
3721        if (err1 != null) {
3722          future.completeExceptionally(err1);
3723          return;
3724        }
3725        if (exist1) {
3726          future.completeExceptionally(new TableExistsException(newTableName));
3727          return;
3728        }
3729        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3730          if (err2 != null) {
3731            future.completeExceptionally(err2);
3732            return;
3733          }
3734          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3735          if (preserveSplits) {
3736            addListener(getTableSplits(tableName), (splits, err3) -> {
3737              if (err3 != null) {
3738                future.completeExceptionally(err3);
3739              } else {
3740                addListener(
3741                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3742                  (result, err4) -> {
3743                    if (err4 != null) {
3744                      future.completeExceptionally(err4);
3745                    } else {
3746                      future.complete(result);
3747                    }
3748                  });
3749              }
3750            });
3751          } else {
3752            addListener(createTable(newTableDesc), (result, err5) -> {
3753              if (err5 != null) {
3754                future.completeExceptionally(err5);
3755              } else {
3756                future.complete(result);
3757              }
3758            });
3759          }
3760        });
3761      });
3762    });
3763    return future;
3764  }
3765
3766  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3767      List<RegionInfo> hris) {
3768    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3769      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3770        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3771        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3772        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3773      .serverName(serverName).call();
3774  }
3775
3776  @Override
3777  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3778    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3779        .action((controller, stub) -> this
3780            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3781              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3782              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3783              resp -> resp.getPreviousRpcThrottleEnabled()))
3784        .call();
3785    return future;
3786  }
3787
3788  @Override
3789  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3790    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3791        .action((controller, stub) -> this
3792            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3793              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3794              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3795              resp -> resp.getRpcThrottleEnabled()))
3796        .call();
3797    return future;
3798  }
3799
3800  @Override
3801  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3802    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3803        .action((controller, stub) -> this
3804            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3805              controller, stub,
3806              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3807                  .build(),
3808              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3809              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3810        .call();
3811    return future;
3812  }
3813
3814  @Override
3815  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3816    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3817      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3818      Map<TableName, Long>> call(controller, stub,
3819        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3820        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3821        resp -> resp.getSizesList().stream().collect(Collectors
3822          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3823      .call();
3824  }
3825
3826  @Override
3827  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3828      ServerName serverName) {
3829    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3830      .action((controller, stub) -> this
3831        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3832        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3833          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3834          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3835          resp -> resp.getSnapshotsList().stream()
3836            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3837              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3838      .serverName(serverName).call();
3839  }
3840
3841  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3842      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3843    return this.<SpaceQuotaSnapshot> newMasterCaller()
3844      .action((controller, stub) -> this
3845        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3846          RequestConverter.buildGetQuotaStatesRequest(),
3847          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3848      .call();
3849  }
3850
3851  @Override
3852  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3853    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3854      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3855      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3856  }
3857
3858  @Override
3859  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3860    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3861    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3862      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3863      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3864  }
3865
3866  @Override
3867  public CompletableFuture<Void> grant(UserPermission userPermission,
3868      boolean mergeExistingPermissions) {
3869    return this.<Void> newMasterCaller()
3870        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3871          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3872          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3873        .call();
3874  }
3875
3876  @Override
3877  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3878    return this.<Void> newMasterCaller()
3879        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3880          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3881          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3882        .call();
3883  }
3884
3885  @Override
3886  public CompletableFuture<List<UserPermission>>
3887      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3888    return this.<List<UserPermission>> newMasterCaller().action((controller,
3889        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3890            List<UserPermission>> call(controller, stub,
3891              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3892              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3893              resp -> resp.getUserPermissionList().stream()
3894                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3895                .collect(Collectors.toList())))
3896        .call();
3897  }
3898
3899  @Override
3900  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3901      List<Permission> permissions) {
3902    return this.<List<Boolean>> newMasterCaller()
3903        .action((controller, stub) -> this
3904            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3905              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3906              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3907              resp -> resp.getHasUserPermissionList()))
3908        .call();
3909  }
3910
3911  @Override
3912  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3913      final boolean sync) {
3914    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3915        .call(controller, stub, RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3916            MasterService.Interface::switchSnapshotCleanup,
3917            SetSnapshotCleanupResponse::getPrevSnapshotCleanup)).call();
3918  }
3919
3920  @Override
3921  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3922    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3923        .call(controller, stub, RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3924            MasterService.Interface::isSnapshotCleanupEnabled,
3925            IsSnapshotCleanupEnabledResponse::getEnabled)).call();
3926  }
3927
3928  @Override
3929  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
3930    return this.<Void> newMasterCaller()
3931        .action((controller, stub) -> this.
3932            <MoveServersRequest, MoveServersResponse, Void> call(controller, stub,
3933                RequestConverter.buildMoveServersRequest(servers, groupName),
3934              (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
3935        .call();
3936  }
3937
3938  @Override
3939  public CompletableFuture<Void> addRSGroup(String groupName) {
3940    return this.<Void> newMasterCaller()
3941        .action(((controller, stub) -> this.
3942            <AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, stub,
3943                AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3944              (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
3945        .call();
3946  }
3947
3948  @Override
3949  public CompletableFuture<Void> removeRSGroup(String groupName) {
3950    return this.<Void> newMasterCaller()
3951        .action((controller, stub) -> this.
3952            <RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(controller, stub,
3953                RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3954              (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
3955        .call();
3956  }
3957
3958  @Override
3959  public CompletableFuture<Boolean> balanceRSGroup(String groupName) {
3960    return this.<Boolean> newMasterCaller()
3961        .action((controller, stub) -> this.
3962            <BalanceRSGroupRequest, BalanceRSGroupResponse, Boolean> call(controller, stub,
3963                BalanceRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3964              (s, c, req, done) -> s.balanceRSGroup(c, req, done), resp -> resp.getBalanceRan()))
3965        .call();
3966  }
3967
3968  @Override
3969  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
3970    return this.<List<RSGroupInfo>> newMasterCaller()
3971        .action((controller, stub) -> this
3972            .<ListRSGroupInfosRequest, ListRSGroupInfosResponse, List<RSGroupInfo>> call(
3973                controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
3974              (s, c, req, done) -> s.listRSGroupInfos(c, req, done),
3975              resp -> resp.getRSGroupInfoList().stream()
3976                  .map(r -> ProtobufUtil.toGroupInfo(r))
3977                  .collect(Collectors.toList())))
3978        .call();
3979  }
3980
3981  @Override
3982  public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
3983      @Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
3984    if (CollectionUtils.isEmpty(serverNames)) {
3985      return CompletableFuture.completedFuture(Collections.emptyList());
3986    }
3987    if (logQueryFilter.getType() == null
3988        || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
3989      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3990        .map((ServerName serverName) ->
3991          getSlowLogResponseFromServer(serverName, logQueryFilter))
3992        .map(CompletableFuture::join)
3993        .flatMap(List::stream)
3994        .collect(Collectors.toList()));
3995    } else {
3996      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3997        .map((ServerName serverName) ->
3998          getLargeLogResponseFromServer(serverName, logQueryFilter))
3999        .map(CompletableFuture::join)
4000        .flatMap(List::stream)
4001        .collect(Collectors.toList()));
4002    }
4003  }
4004
4005  private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer(
4006      final ServerName serverName, final LogQueryFilter logQueryFilter) {
4007    return this.<List<OnlineLogRecord>>newAdminCaller()
4008      .action((controller, stub) -> this
4009        .adminCall(
4010          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
4011          AdminService.Interface::getSlowLogResponses,
4012          ProtobufUtil::toSlowLogPayloads))
4013      .serverName(serverName).call();
4014  }
4015
4016  private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer(
4017    final ServerName serverName, final LogQueryFilter logQueryFilter) {
4018    return this.<List<OnlineLogRecord>>newAdminCaller()
4019      .action((controller, stub) -> this
4020        .adminCall(
4021          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
4022          AdminService.Interface::getLargeLogResponses,
4023          ProtobufUtil::toSlowLogPayloads))
4024      .serverName(serverName).call();
4025  }
4026
4027  @Override
4028  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
4029      @Nullable Set<ServerName> serverNames) {
4030    if (CollectionUtils.isEmpty(serverNames)) {
4031      return CompletableFuture.completedFuture(Collections.emptyList());
4032    }
4033    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
4034      .map(this::clearSlowLogsResponses)
4035      .collect(Collectors.toList());
4036    return convertToFutureOfList(clearSlowLogResponseList);
4037  }
4038
4039  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4040    return this.<Boolean>newAdminCaller()
4041      .action(((controller, stub) -> this
4042        .adminCall(
4043          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
4044          AdminService.Interface::clearSlowLogsResponses,
4045          ProtobufUtil::toClearSlowLogPayload))
4046      ).serverName(serverName).call();
4047  }
4048
4049  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
4050    List<CompletableFuture<T>> futures) {
4051    CompletableFuture<Void> allDoneFuture =
4052      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4053    return allDoneFuture.thenApply(v ->
4054      futures.stream()
4055        .map(CompletableFuture::join)
4056        .collect(Collectors.toList())
4057    );
4058  }
4059
4060  @Override
4061  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4062    return this.<List<TableName>> newMasterCaller()
4063      .action((controller, stub) -> this
4064        .<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, List<TableName>> call(controller,
4065          stub, ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4066          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4067            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4068      .call();
4069  }
4070
4071  @Override
4072  public CompletableFuture<Pair<List<String>, List<TableName>>>
4073    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4074    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4075      .action((controller, stub) -> this
4076        .<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4077          GetConfiguredNamespacesAndTablesInRSGroupResponse,
4078          Pair<List<String>, List<TableName>>> call(controller, stub,
4079          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4080            .build(),
4081            (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4082            resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4083              .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4084      .call();
4085  }
4086
4087  @Override
4088  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4089    return this.<RSGroupInfo> newMasterCaller()
4090      .action(((controller, stub) -> this
4091        .<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, RSGroupInfo> call(
4092          controller, stub,
4093          GetRSGroupInfoOfServerRequest.newBuilder()
4094            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4095              .setPort(hostPort.getPort()).build())
4096            .build(),
4097          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4098          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4099      .call();
4100  }
4101
4102  @Override
4103  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4104    return this.<Void> newMasterCaller()
4105      .action((controller, stub) -> this.
4106            <RemoveServersRequest, RemoveServersResponse, Void> call(controller, stub,
4107                RequestConverter.buildRemoveServersRequest(servers),
4108              (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4109        .call();
4110  }
4111
4112  @Override
4113  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4114    CompletableFuture<Void> future = new CompletableFuture<>();
4115    for (TableName tableName : tables) {
4116      addListener(tableExists(tableName), (exist, err) -> {
4117        if (err != null) {
4118          future.completeExceptionally(err);
4119          return;
4120        }
4121        if (!exist) {
4122          future.completeExceptionally(new TableNotFoundException(tableName));
4123          return;
4124        }
4125      });
4126    }
4127    addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4128      if (err != null) {
4129        future.completeExceptionally(err);
4130        return;
4131      }
4132      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4133        future.complete(null);
4134        return;
4135      }
4136      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4137      for (TableDescriptor td : tableDescriptions) {
4138        newTableDescriptors
4139            .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4140      }
4141      addListener(CompletableFuture.allOf(
4142        newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4143        (v, e) -> {
4144          if (e != null) {
4145            future.completeExceptionally(e);
4146          } else {
4147            future.complete(v);
4148          }
4149        });
4150    }));
4151    return future;
4152  }
4153
4154  @Override
4155  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4156    return this.<RSGroupInfo> newMasterCaller().action(((controller, stub) -> this
4157      .<GetRSGroupInfoOfTableRequest, GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller,
4158        stub,
4159        GetRSGroupInfoOfTableRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(table))
4160          .build(),
4161        (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4162        resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4163      .call();
4164  }
4165
4166  @Override
4167  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4168    return this.<RSGroupInfo> newMasterCaller()
4169      .action(((controller, stub) -> this
4170        .<GetRSGroupInfoRequest, GetRSGroupInfoResponse, RSGroupInfo> call(controller, stub,
4171          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4172          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4173          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4174      .call();
4175  }
4176
4177  @Override
4178  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4179    return this.<Void> newMasterCaller()
4180      .action(
4181        (
4182          (controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4183            controller,
4184            stub,
4185            RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName).setNewRsgroupName(newName)
4186                                .build(),
4187            (s, c, req, done) -> s.renameRSGroup(c, req, done),
4188            resp -> null
4189          )
4190        )
4191      ).call();
4192  }
4193
4194  @Override
4195  public CompletableFuture<Void>
4196    updateRSGroupConfig(String groupName, Map<String, String> configuration) {
4197    UpdateRSGroupConfigRequest.Builder request = UpdateRSGroupConfigRequest.newBuilder()
4198        .setGroupName(groupName);
4199    if (configuration != null) {
4200      configuration.entrySet().forEach(e ->
4201          request.addConfiguration(NameStringPair.newBuilder().setName(e.getKey())
4202              .setValue(e.getValue()).build()));
4203    }
4204    return this.<Void> newMasterCaller()
4205        .action(((controller, stub) ->
4206            this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
4207                controller, stub, request.build(),
4208              (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4209        ).call();
4210  }
4211}