001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.EnumSet;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.Set;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.ConcurrentLinkedQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicReference;
040import java.util.function.BiConsumer;
041import java.util.function.Consumer;
042import java.util.function.Function;
043import java.util.function.Supplier;
044import java.util.regex.Pattern;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.hbase.CacheEvictionStats;
049import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
050import org.apache.hadoop.hbase.CatalogFamilyFormat;
051import org.apache.hadoop.hbase.ClientMetaTableAccessor;
052import org.apache.hadoop.hbase.ClusterMetrics;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.ClusterMetricsBuilder;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.NamespaceDescriptor;
058import org.apache.hadoop.hbase.RegionLocations;
059import org.apache.hadoop.hbase.RegionMetrics;
060import org.apache.hadoop.hbase.RegionMetricsBuilder;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableExistsException;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.TableNotDisabledException;
065import org.apache.hadoop.hbase.TableNotEnabledException;
066import org.apache.hadoop.hbase.TableNotFoundException;
067import org.apache.hadoop.hbase.UnknownRegionException;
068import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.Scan.ReadType;
072import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
073import org.apache.hadoop.hbase.client.replication.TableCFs;
074import org.apache.hadoop.hbase.client.security.SecurityCapability;
075import org.apache.hadoop.hbase.exceptions.DeserializationException;
076import org.apache.hadoop.hbase.ipc.HBaseRpcController;
077import org.apache.hadoop.hbase.net.Address;
078import org.apache.hadoop.hbase.quotas.QuotaFilter;
079import org.apache.hadoop.hbase.quotas.QuotaSettings;
080import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
081import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
082import org.apache.hadoop.hbase.replication.ReplicationException;
083import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
084import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
085import org.apache.hadoop.hbase.replication.SyncReplicationState;
086import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
087import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
088import org.apache.hadoop.hbase.security.access.Permission;
089import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
090import org.apache.hadoop.hbase.security.access.UserPermission;
091import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
092import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
093import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
097import org.apache.hadoop.hbase.util.Pair;
098import org.apache.yetus.audience.InterfaceAudience;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
103import org.apache.hbase.thirdparty.com.google.protobuf.Message;
104import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
106import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
107import org.apache.hbase.thirdparty.io.netty.util.Timeout;
108import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
109import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
110
111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
112import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
330
331/**
332 * The implementation of AsyncAdmin.
333 * <p>
334 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
335 * be finished inside the rpc framework thread, which means that the callbacks registered to the
336 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
337 * this class should not try to do time consuming tasks in the callbacks.
338 * @since 2.0.0
339 * @see AsyncHBaseAdmin
340 * @see AsyncConnection#getAdmin()
341 * @see AsyncConnection#getAdminBuilder()
342 */
343@InterfaceAudience.Private
344class RawAsyncHBaseAdmin implements AsyncAdmin {
345
346  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
347
348  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
349
350  private final AsyncConnectionImpl connection;
351
352  private final HashedWheelTimer retryTimer;
353
354  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
355
356  private final long rpcTimeoutNs;
357
358  private final long operationTimeoutNs;
359
360  private final long pauseNs;
361
362  private final long pauseForCQTBENs;
363
364  private final int maxAttempts;
365
366  private final int startLogErrorsCnt;
367
368  private final NonceGenerator ng;
369
370  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
371      AsyncAdminBuilderBase builder) {
372    this.connection = connection;
373    this.retryTimer = retryTimer;
374    this.metaTable = connection.getTable(META_TABLE_NAME);
375    this.rpcTimeoutNs = builder.rpcTimeoutNs;
376    this.operationTimeoutNs = builder.operationTimeoutNs;
377    this.pauseNs = builder.pauseNs;
378    if (builder.pauseForCQTBENs < builder.pauseNs) {
379      LOG.warn(
380        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
381          " the normal pause value {} ms, use the greater one instead",
382        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
383        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
384      this.pauseForCQTBENs = builder.pauseNs;
385    } else {
386      this.pauseForCQTBENs = builder.pauseForCQTBENs;
387    }
388    this.maxAttempts = builder.maxAttempts;
389    this.startLogErrorsCnt = builder.startLogErrorsCnt;
390    this.ng = connection.getNonceGenerator();
391  }
392
393  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
394    return this.connection.callerFactory.<T> masterRequest()
395      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
396      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
397      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
398      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
399  }
400
401  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
402    return this.connection.callerFactory.<T> adminRequest()
403      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
404      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
405      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
406      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
407  }
408
409  @FunctionalInterface
410  private interface MasterRpcCall<RESP, REQ> {
411    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
412        RpcCallback<RESP> done);
413  }
414
415  @FunctionalInterface
416  private interface AdminRpcCall<RESP, REQ> {
417    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
418        RpcCallback<RESP> done);
419  }
420
421  @FunctionalInterface
422  private interface Converter<D, S> {
423    D convert(S src) throws IOException;
424  }
425
426  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
427      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
428      Converter<RESP, PRESP> respConverter) {
429    CompletableFuture<RESP> future = new CompletableFuture<>();
430    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
431
432      @Override
433      public void run(PRESP resp) {
434        if (controller.failed()) {
435          future.completeExceptionally(controller.getFailed());
436        } else {
437          try {
438            future.complete(respConverter.convert(resp));
439          } catch (IOException e) {
440            future.completeExceptionally(e);
441          }
442        }
443      }
444    });
445    return future;
446  }
447
448  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
449      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
450      Converter<RESP, PRESP> respConverter) {
451    CompletableFuture<RESP> future = new CompletableFuture<>();
452    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
453
454      @Override
455      public void run(PRESP resp) {
456        if (controller.failed()) {
457          future.completeExceptionally(controller.getFailed());
458        } else {
459          try {
460            future.complete(respConverter.convert(resp));
461          } catch (IOException e) {
462            future.completeExceptionally(e);
463          }
464        }
465      }
466    });
467    return future;
468  }
469
470  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
471      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
472      ProcedureBiConsumer consumer) {
473    return procedureCall(b -> {
474    }, preq, rpcCall, respConverter, consumer);
475  }
476
477  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
478      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
479      ProcedureBiConsumer consumer) {
480    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
481  }
482
483  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
484      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
485      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
486      ProcedureBiConsumer consumer) {
487    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
488        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
489    prioritySetter.accept(builder);
490    CompletableFuture<Long> procFuture = builder.call();
491    CompletableFuture<Void> future = waitProcedureResult(procFuture);
492    addListener(future, consumer);
493    return future;
494  }
495
496  @FunctionalInterface
497  private interface TableOperator {
498    CompletableFuture<Void> operate(TableName table);
499  }
500
501  @Override
502  public CompletableFuture<Boolean> tableExists(TableName tableName) {
503    if (TableName.isMetaTableName(tableName)) {
504      return CompletableFuture.completedFuture(true);
505    }
506    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
507  }
508
509  @Override
510  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
511    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
512      includeSysTables));
513  }
514
515  /**
516   * {@link #listTableDescriptors(boolean)}
517   */
518  @Override
519  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
520      boolean includeSysTables) {
521    Preconditions.checkNotNull(pattern,
522      "pattern is null. If you don't specify a pattern, "
523          + "use listTableDescriptors(boolean) instead");
524    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
525      includeSysTables));
526  }
527
528  @Override
529  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
530    Preconditions.checkNotNull(tableNames,
531      "tableNames is null. If you don't specify tableNames, "
532          + "use listTableDescriptors(boolean) instead");
533    if (tableNames.isEmpty()) {
534      return CompletableFuture.completedFuture(Collections.emptyList());
535    }
536    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
537  }
538
539  private CompletableFuture<List<TableDescriptor>>
540      getTableDescriptors(GetTableDescriptorsRequest request) {
541    return this.<List<TableDescriptor>> newMasterCaller()
542        .action((controller, stub) -> this
543            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
544              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
545              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
546        .call();
547  }
548
549  @Override
550  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
551    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
552  }
553
554  @Override
555  public CompletableFuture<List<TableName>>
556      listTableNames(Pattern pattern, boolean includeSysTables) {
557    Preconditions.checkNotNull(pattern,
558        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
559    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
560  }
561
562  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
563    return this
564        .<List<TableName>> newMasterCaller()
565        .action(
566          (controller, stub) -> this
567              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
568                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
569                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
570  }
571
572  @Override
573  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
574    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
575        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
576        List<TableDescriptor>> call(
577          controller, stub,
578          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
579          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
580          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
581        .call();
582  }
583
584  @Override
585  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
586    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
587        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
588        List<TableName>> call(
589          controller, stub,
590          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
591          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
592          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
593        .call();
594  }
595
596  @Override
597  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
598    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
599    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
600      .action((controller, stub) -> this
601        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
602          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
603          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
604          (resp) -> resp.getTableSchemaList()))
605      .call(), (tableSchemas, error) -> {
606        if (error != null) {
607          future.completeExceptionally(error);
608          return;
609        }
610        if (!tableSchemas.isEmpty()) {
611          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
612        } else {
613          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
614        }
615      });
616    return future;
617  }
618
619  @Override
620  public CompletableFuture<Void> createTable(TableDescriptor desc) {
621    return createTable(desc.getTableName(),
622      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
623  }
624
625  @Override
626  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
627      int numRegions) {
628    try {
629      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
630    } catch (IllegalArgumentException e) {
631      return failedFuture(e);
632    }
633  }
634
635  @Override
636  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
637    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
638        + " use createTable(TableDescriptor) instead");
639    try {
640      verifySplitKeys(splitKeys);
641      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
642        splitKeys, ng.getNonceGroup(), ng.newNonce()));
643    } catch (IllegalArgumentException e) {
644      return failedFuture(e);
645    }
646  }
647
648  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
649    Preconditions.checkNotNull(tableName, "table name is null");
650    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
651      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
652      new CreateTableProcedureBiConsumer(tableName));
653  }
654
655  @Override
656  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
657    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
658      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
659        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
660      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
661  }
662
663  @Override
664  public CompletableFuture<Void> deleteTable(TableName tableName) {
665    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
666      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
667      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
668      new DeleteTableProcedureBiConsumer(tableName));
669  }
670
671  @Override
672  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
673    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
674      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
675        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
676      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
677  }
678
679  @Override
680  public CompletableFuture<Void> enableTable(TableName tableName) {
681    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
682      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
683      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
684      new EnableTableProcedureBiConsumer(tableName));
685  }
686
687  @Override
688  public CompletableFuture<Void> disableTable(TableName tableName) {
689    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
690      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
691      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
692      new DisableTableProcedureBiConsumer(tableName));
693  }
694
695  /**
696   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
697   * using passed parameters. Sets error or boolean result ('true' if table matches
698   * the passed-in targetState).
699   */
700  private static CompletableFuture<Boolean> completeCheckTableState(
701      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
702      TableState.State targetState, TableName tableName) {
703    if (error != null) {
704      future.completeExceptionally(error);
705    } else {
706      if (tableState != null) {
707        future.complete(tableState.inStates(targetState));
708      } else {
709        future.completeExceptionally(new TableNotFoundException(tableName));
710      }
711    }
712    return future;
713  }
714
715  @Override
716  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
717    if (TableName.isMetaTableName(tableName)) {
718      return CompletableFuture.completedFuture(true);
719    }
720    CompletableFuture<Boolean> future = new CompletableFuture<>();
721    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
722      (tableState, error) -> {
723        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
724          TableState.State.ENABLED, tableName);
725      });
726    return future;
727  }
728
729  @Override
730  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
731    if (TableName.isMetaTableName(tableName)) {
732      return CompletableFuture.completedFuture(false);
733    }
734    CompletableFuture<Boolean> future = new CompletableFuture<>();
735    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
736      (tableState, error) -> {
737        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
738          TableState.State.DISABLED, tableName);
739      });
740    return future;
741  }
742
743  @Override
744  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
745    if (TableName.isMetaTableName(tableName)) {
746      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
747        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
748    }
749    CompletableFuture<Boolean> future = new CompletableFuture<>();
750    addListener(isTableEnabled(tableName), (enabled, error) -> {
751      if (error != null) {
752        if (error instanceof TableNotFoundException) {
753          future.complete(false);
754        } else {
755          future.completeExceptionally(error);
756        }
757        return;
758      }
759      if (!enabled) {
760        future.complete(false);
761      } else {
762        addListener(
763          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
764          (locations, error1) -> {
765            if (error1 != null) {
766              future.completeExceptionally(error1);
767              return;
768            }
769            List<HRegionLocation> notDeployedRegions = locations.stream()
770              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
771            if (notDeployedRegions.size() > 0) {
772              if (LOG.isDebugEnabled()) {
773                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
774              }
775              future.complete(false);
776              return;
777            }
778            future.complete(true);
779          });
780      }
781    });
782    return future;
783  }
784
785  @Override
786  public CompletableFuture<Void> addColumnFamily(
787      TableName tableName, ColumnFamilyDescriptor columnFamily) {
788    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
789      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
790        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
791      new AddColumnFamilyProcedureBiConsumer(tableName));
792  }
793
794  @Override
795  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
796    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
797      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
798        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
799      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
800  }
801
802  @Override
803  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
804      ColumnFamilyDescriptor columnFamily) {
805    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
806      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
807        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
808      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
809  }
810
811  @Override
812  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
813    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
814      RequestConverter.buildCreateNamespaceRequest(descriptor),
815      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
816      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
817  }
818
819  @Override
820  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
821    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
822      RequestConverter.buildModifyNamespaceRequest(descriptor),
823      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
824      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
825  }
826
827  @Override
828  public CompletableFuture<Void> deleteNamespace(String name) {
829    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
830      RequestConverter.buildDeleteNamespaceRequest(name),
831      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
832      new DeleteNamespaceProcedureBiConsumer(name));
833  }
834
835  @Override
836  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
837    return this
838        .<NamespaceDescriptor> newMasterCaller()
839        .action(
840          (controller, stub) -> this
841              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
842                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
843                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
844                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
845  }
846
847  @Override
848  public CompletableFuture<List<String>> listNamespaces() {
849    return this
850        .<List<String>> newMasterCaller()
851        .action(
852          (controller, stub) -> this
853              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
854                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
855                  done) -> s.listNamespaces(c, req, done),
856                (resp) -> resp.getNamespaceNameList())).call();
857  }
858
859  @Override
860  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
861    return this
862        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
863              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
864                  List<NamespaceDescriptor>> call(controller, stub,
865                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
866                      s.listNamespaceDescriptors(c, req, done),
867                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
868  }
869
870  @Override
871  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
872    return this.<List<RegionInfo>> newAdminCaller()
873        .action((controller, stub) -> this
874            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
875              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
876              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
877              resp -> ProtobufUtil.getRegionInfos(resp)))
878        .serverName(serverName).call();
879  }
880
881  @Override
882  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
883    if (tableName.equals(META_TABLE_NAME)) {
884      return connection.registry.getMetaRegionLocations()
885        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
886          .collect(Collectors.toList()));
887    } else {
888      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
889        .thenApply(
890          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
891    }
892  }
893  @Override
894  public CompletableFuture<Void> flush(TableName tableName) {
895    return flush(tableName, null);
896  }
897
898  @Override
899  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
900    CompletableFuture<Void> future = new CompletableFuture<>();
901    addListener(tableExists(tableName), (exists, err) -> {
902      if (err != null) {
903        future.completeExceptionally(err);
904      } else if (!exists) {
905        future.completeExceptionally(new TableNotFoundException(tableName));
906      } else {
907        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
908          if (err2 != null) {
909            future.completeExceptionally(err2);
910          } else if (!tableEnabled) {
911            future.completeExceptionally(new TableNotEnabledException(tableName));
912          } else {
913            Map<String, String> props = new HashMap<>();
914            if (columnFamily != null) {
915              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
916            }
917            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
918              props), (ret, err3) -> {
919                if (err3 != null) {
920                  future.completeExceptionally(err3);
921                } else {
922                  future.complete(ret);
923                }
924              });
925          }
926        });
927      }
928    });
929    return future;
930  }
931
932  @Override
933  public CompletableFuture<Void> flushRegion(byte[] regionName) {
934    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
935    });
936  }
937
938  @Override
939  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
940    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
941      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
942    return flushRegionInternal(regionName, columnFamily, false)
943      .thenAccept(r -> {});
944  }
945
946  /**
947   * This method is for internal use only, where we need the response of the flush.
948   * <p/>
949   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
950   * API.
951   */
952  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
953    byte[] columnFamily, boolean writeFlushWALMarker) {
954    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
955    addListener(getRegionLocation(regionName), (location, err) -> {
956      if (err != null) {
957        future.completeExceptionally(err);
958        return;
959      }
960      ServerName serverName = location.getServerName();
961      if (serverName == null) {
962        future
963          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
964        return;
965      }
966      addListener(
967        flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
968        (ret, err2) -> {
969          if (err2 != null) {
970            future.completeExceptionally(err2);
971          } else {
972            future.complete(ret);
973          }});
974    });
975    return future;
976  }
977
978  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
979    byte[] columnFamily, boolean writeFlushWALMarker) {
980    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
981      .action((controller, stub) -> this
982        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
983          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(),
984            columnFamily, writeFlushWALMarker),
985          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
986      .call();
987  }
988
989  @Override
990  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
991    CompletableFuture<Void> future = new CompletableFuture<>();
992    addListener(getRegions(sn), (hRegionInfos, err) -> {
993      if (err != null) {
994        future.completeExceptionally(err);
995        return;
996      }
997      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
998      if (hRegionInfos != null) {
999        hRegionInfos.forEach(
1000          region -> compactFutures.add(
1001            flush(sn, region, null, false).thenAccept(r -> {})
1002          )
1003        );
1004      }
1005      addListener(CompletableFuture.allOf(
1006        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1007          if (err2 != null) {
1008            future.completeExceptionally(err2);
1009          } else {
1010            future.complete(ret);
1011          }
1012        });
1013    });
1014    return future;
1015  }
1016
1017  @Override
1018  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1019    return compact(tableName, null, false, compactType);
1020  }
1021
1022  @Override
1023  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1024      CompactType compactType) {
1025    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1026        + "If you don't specify a columnFamily, use compact(TableName) instead");
1027    return compact(tableName, columnFamily, false, compactType);
1028  }
1029
1030  @Override
1031  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1032    return compactRegion(regionName, null, false);
1033  }
1034
1035  @Override
1036  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1037    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1038        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1039    return compactRegion(regionName, columnFamily, false);
1040  }
1041
1042  @Override
1043  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1044    return compact(tableName, null, true, compactType);
1045  }
1046
1047  @Override
1048  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1049      CompactType compactType) {
1050    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1051        + "If you don't specify a columnFamily, use compact(TableName) instead");
1052    return compact(tableName, columnFamily, true, compactType);
1053  }
1054
1055  @Override
1056  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1057    return compactRegion(regionName, null, true);
1058  }
1059
1060  @Override
1061  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1062    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1063        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1064    return compactRegion(regionName, columnFamily, true);
1065  }
1066
1067  @Override
1068  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1069    return compactRegionServer(sn, false);
1070  }
1071
1072  @Override
1073  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1074    return compactRegionServer(sn, true);
1075  }
1076
1077  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1078    CompletableFuture<Void> future = new CompletableFuture<>();
1079    addListener(getRegions(sn), (hRegionInfos, err) -> {
1080      if (err != null) {
1081        future.completeExceptionally(err);
1082        return;
1083      }
1084      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1085      if (hRegionInfos != null) {
1086        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1087      }
1088      addListener(CompletableFuture.allOf(
1089        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1090          if (err2 != null) {
1091            future.completeExceptionally(err2);
1092          } else {
1093            future.complete(ret);
1094          }
1095        });
1096    });
1097    return future;
1098  }
1099
1100  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1101      boolean major) {
1102    CompletableFuture<Void> future = new CompletableFuture<>();
1103    addListener(getRegionLocation(regionName), (location, err) -> {
1104      if (err != null) {
1105        future.completeExceptionally(err);
1106        return;
1107      }
1108      ServerName serverName = location.getServerName();
1109      if (serverName == null) {
1110        future
1111          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1112        return;
1113      }
1114      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1115        (ret, err2) -> {
1116          if (err2 != null) {
1117            future.completeExceptionally(err2);
1118          } else {
1119            future.complete(ret);
1120          }
1121        });
1122    });
1123    return future;
1124  }
1125
1126  /**
1127   * List all region locations for the specific table.
1128   */
1129  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1130    if (TableName.META_TABLE_NAME.equals(tableName)) {
1131      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1132      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1133        if (err != null) {
1134          future.completeExceptionally(err);
1135        } else if (metaRegions == null || metaRegions.isEmpty() ||
1136          metaRegions.getDefaultRegionLocation() == null) {
1137          future.completeExceptionally(new IOException("meta region does not found"));
1138        } else {
1139          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1140        }
1141      });
1142      return future;
1143    } else {
1144      // For non-meta table, we fetch all locations by scanning hbase:meta table
1145      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1146    }
1147  }
1148
1149  /**
1150   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1151   */
1152  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1153      CompactType compactType) {
1154    CompletableFuture<Void> future = new CompletableFuture<>();
1155
1156    switch (compactType) {
1157      case MOB:
1158        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1159          if (err != null) {
1160            future.completeExceptionally(err);
1161            return;
1162          }
1163          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1164          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1165            if (err2 != null) {
1166              future.completeExceptionally(err2);
1167            } else {
1168              future.complete(ret);
1169            }
1170          });
1171        });
1172        break;
1173      case NORMAL:
1174        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1175          if (err != null) {
1176            future.completeExceptionally(err);
1177            return;
1178          }
1179          if (locations == null || locations.isEmpty()) {
1180            future.completeExceptionally(new TableNotFoundException(tableName));
1181          }
1182          CompletableFuture<?>[] compactFutures =
1183            locations.stream().filter(l -> l.getRegion() != null)
1184              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1185              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1186              .toArray(CompletableFuture<?>[]::new);
1187          // future complete unless all of the compact futures are completed.
1188          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1189            if (err2 != null) {
1190              future.completeExceptionally(err2);
1191            } else {
1192              future.complete(ret);
1193            }
1194          });
1195        });
1196        break;
1197      default:
1198        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1199    }
1200    return future;
1201  }
1202
1203  /**
1204   * Compact the region at specific region server.
1205   */
1206  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1207      final boolean major, byte[] columnFamily) {
1208    return this
1209        .<Void> newAdminCaller()
1210        .serverName(sn)
1211        .action(
1212          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1213            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1214              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1215            resp -> null)).call();
1216  }
1217
1218  private byte[] toEncodeRegionName(byte[] regionName) {
1219    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1220      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1221  }
1222
1223  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1224      CompletableFuture<TableName> result) {
1225    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1226      if (err != null) {
1227        result.completeExceptionally(err);
1228        return;
1229      }
1230      RegionInfo regionInfo = location.getRegion();
1231      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1232        result.completeExceptionally(
1233          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1234        return;
1235      }
1236      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1237        if (!tableName.get().equals(regionInfo.getTable())) {
1238          // tables of this two region should be same.
1239          result.completeExceptionally(
1240            new IllegalArgumentException("Cannot merge regions from two different tables " +
1241              tableName.get() + " and " + regionInfo.getTable()));
1242        } else {
1243          result.complete(tableName.get());
1244        }
1245      }
1246    });
1247  }
1248
1249  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1250    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1251    CompletableFuture<TableName> future = new CompletableFuture<>();
1252    for (byte[] encodedRegionName : encodedRegionNames) {
1253      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1254    }
1255    return future;
1256  }
1257
1258  @Override
1259  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1260    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1261  }
1262
1263  @Override
1264  public CompletableFuture<Boolean> isMergeEnabled() {
1265    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1266  }
1267
1268  @Override
1269  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1270    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1271  }
1272
1273  @Override
1274  public CompletableFuture<Boolean> isSplitEnabled() {
1275    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1276  }
1277
1278  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1279      MasterSwitchType switchType) {
1280    SetSplitOrMergeEnabledRequest request =
1281      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1282    return this.<Boolean> newMasterCaller()
1283      .action((controller, stub) -> this
1284        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1285          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1286          (resp) -> resp.getPrevValueList().get(0)))
1287      .call();
1288  }
1289
1290  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1291    IsSplitOrMergeEnabledRequest request =
1292        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1293    return this
1294        .<Boolean> newMasterCaller()
1295        .action(
1296          (controller, stub) -> this
1297              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1298                controller, stub, request,
1299                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1300                (resp) -> resp.getEnabled())).call();
1301  }
1302
1303  @Override
1304  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1305    if (nameOfRegionsToMerge.size() < 2) {
1306      return failedFuture(new IllegalArgumentException(
1307        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1308    }
1309    CompletableFuture<Void> future = new CompletableFuture<>();
1310    byte[][] encodedNameOfRegionsToMerge =
1311      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1312
1313    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1314      if (err != null) {
1315        future.completeExceptionally(err);
1316        return;
1317      }
1318
1319      final MergeTableRegionsRequest request;
1320      try {
1321        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1322          forcible, ng.getNonceGroup(), ng.newNonce());
1323      } catch (DeserializationException e) {
1324        future.completeExceptionally(e);
1325        return;
1326      }
1327
1328      addListener(
1329        this.procedureCall(tableName, request,
1330          MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1331          new MergeTableRegionProcedureBiConsumer(tableName)),
1332        (ret, err2) -> {
1333          if (err2 != null) {
1334            future.completeExceptionally(err2);
1335          } else {
1336            future.complete(ret);
1337          }
1338        });
1339    });
1340    return future;
1341  }
1342
1343  @Override
1344  public CompletableFuture<Void> split(TableName tableName) {
1345    CompletableFuture<Void> future = new CompletableFuture<>();
1346    addListener(tableExists(tableName), (exist, error) -> {
1347      if (error != null) {
1348        future.completeExceptionally(error);
1349        return;
1350      }
1351      if (!exist) {
1352        future.completeExceptionally(new TableNotFoundException(tableName));
1353        return;
1354      }
1355      addListener(metaTable
1356        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1357          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1358            ClientMetaTableAccessor.QueryType.REGION))
1359          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1360            ClientMetaTableAccessor.QueryType.REGION))),
1361        (results, err2) -> {
1362          if (err2 != null) {
1363            future.completeExceptionally(err2);
1364            return;
1365          }
1366          if (results != null && !results.isEmpty()) {
1367            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1368            for (Result r : results) {
1369              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1370                continue;
1371              }
1372              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1373              if (rl != null) {
1374                for (HRegionLocation h : rl.getRegionLocations()) {
1375                  if (h != null && h.getServerName() != null) {
1376                    RegionInfo hri = h.getRegion();
1377                    if (hri == null || hri.isSplitParent() ||
1378                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1379                      continue;
1380                    }
1381                    splitFutures.add(split(hri, null));
1382                  }
1383                }
1384              }
1385            }
1386            addListener(
1387              CompletableFuture
1388                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1389              (ret, exception) -> {
1390                if (exception != null) {
1391                  future.completeExceptionally(exception);
1392                  return;
1393                }
1394                future.complete(ret);
1395              });
1396          } else {
1397            future.complete(null);
1398          }
1399        });
1400    });
1401    return future;
1402  }
1403
1404  @Override
1405  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1406    CompletableFuture<Void> result = new CompletableFuture<>();
1407    if (splitPoint == null) {
1408      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1409    }
1410    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1411      (loc, err) -> {
1412        if (err != null) {
1413          result.completeExceptionally(err);
1414        } else if (loc == null || loc.getRegion() == null) {
1415          result.completeExceptionally(new IllegalArgumentException(
1416            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1417        } else {
1418          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1419            if (err2 != null) {
1420              result.completeExceptionally(err2);
1421            } else {
1422              result.complete(ret);
1423            }
1424
1425          });
1426        }
1427      });
1428    return result;
1429  }
1430
1431  @Override
1432  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1433    CompletableFuture<Void> future = new CompletableFuture<>();
1434    addListener(getRegionLocation(regionName), (location, err) -> {
1435      if (err != null) {
1436        future.completeExceptionally(err);
1437        return;
1438      }
1439      RegionInfo regionInfo = location.getRegion();
1440      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1441        future
1442          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1443            "Replicas are auto-split when their primary is split."));
1444        return;
1445      }
1446      ServerName serverName = location.getServerName();
1447      if (serverName == null) {
1448        future
1449          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1450        return;
1451      }
1452      addListener(split(regionInfo, null), (ret, err2) -> {
1453        if (err2 != null) {
1454          future.completeExceptionally(err2);
1455        } else {
1456          future.complete(ret);
1457        }
1458      });
1459    });
1460    return future;
1461  }
1462
1463  @Override
1464  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1465    Preconditions.checkNotNull(splitPoint,
1466      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1467    CompletableFuture<Void> future = new CompletableFuture<>();
1468    addListener(getRegionLocation(regionName), (location, err) -> {
1469      if (err != null) {
1470        future.completeExceptionally(err);
1471        return;
1472      }
1473      RegionInfo regionInfo = location.getRegion();
1474      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1475        future
1476          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1477            "Replicas are auto-split when their primary is split."));
1478        return;
1479      }
1480      ServerName serverName = location.getServerName();
1481      if (serverName == null) {
1482        future
1483          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1484        return;
1485      }
1486      if (regionInfo.getStartKey() != null &&
1487        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1488        future.completeExceptionally(
1489          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1490        return;
1491      }
1492      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1493        if (err2 != null) {
1494          future.completeExceptionally(err2);
1495        } else {
1496          future.complete(ret);
1497        }
1498      });
1499    });
1500    return future;
1501  }
1502
1503  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1504    CompletableFuture<Void> future = new CompletableFuture<>();
1505    TableName tableName = hri.getTable();
1506    final SplitTableRegionRequest request;
1507    try {
1508      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1509        ng.newNonce());
1510    } catch (DeserializationException e) {
1511      future.completeExceptionally(e);
1512      return future;
1513    }
1514
1515    addListener(
1516      this.procedureCall(tableName,
1517        request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1518        new SplitTableRegionProcedureBiConsumer(tableName)),
1519      (ret, err2) -> {
1520        if (err2 != null) {
1521          future.completeExceptionally(err2);
1522        } else {
1523          future.complete(ret);
1524        }
1525      });
1526    return future;
1527  }
1528
1529  @Override
1530  public CompletableFuture<Void> assign(byte[] regionName) {
1531    CompletableFuture<Void> future = new CompletableFuture<>();
1532    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1533      if (err != null) {
1534        future.completeExceptionally(err);
1535        return;
1536      }
1537      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1538        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1539          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1540          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1541        .call(), (ret, err2) -> {
1542          if (err2 != null) {
1543            future.completeExceptionally(err2);
1544          } else {
1545            future.complete(ret);
1546          }
1547        });
1548    });
1549    return future;
1550  }
1551
1552  @Override
1553  public CompletableFuture<Void> unassign(byte[] regionName) {
1554    CompletableFuture<Void> future = new CompletableFuture<>();
1555    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1556      if (err != null) {
1557        future.completeExceptionally(err);
1558        return;
1559      }
1560      addListener(
1561        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1562          .action(((controller, stub) -> this
1563            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1564              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1565              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1566          .call(),
1567        (ret, err2) -> {
1568          if (err2 != null) {
1569            future.completeExceptionally(err2);
1570          } else {
1571            future.complete(ret);
1572          }
1573        });
1574    });
1575    return future;
1576  }
1577
1578  @Override
1579  public CompletableFuture<Void> offline(byte[] regionName) {
1580    CompletableFuture<Void> future = new CompletableFuture<>();
1581    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1582      if (err != null) {
1583        future.completeExceptionally(err);
1584        return;
1585      }
1586      addListener(
1587        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1588          .action(((controller, stub) -> this
1589            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1590              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1591              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1592          .call(),
1593        (ret, err2) -> {
1594          if (err2 != null) {
1595            future.completeExceptionally(err2);
1596          } else {
1597            future.complete(ret);
1598          }
1599        });
1600    });
1601    return future;
1602  }
1603
1604  @Override
1605  public CompletableFuture<Void> move(byte[] regionName) {
1606    CompletableFuture<Void> future = new CompletableFuture<>();
1607    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1608      if (err != null) {
1609        future.completeExceptionally(err);
1610        return;
1611      }
1612      addListener(
1613        moveRegion(regionInfo,
1614          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1615        (ret, err2) -> {
1616          if (err2 != null) {
1617            future.completeExceptionally(err2);
1618          } else {
1619            future.complete(ret);
1620          }
1621        });
1622    });
1623    return future;
1624  }
1625
1626  @Override
1627  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1628    Preconditions.checkNotNull(destServerName,
1629      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1630    CompletableFuture<Void> future = new CompletableFuture<>();
1631    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1632      if (err != null) {
1633        future.completeExceptionally(err);
1634        return;
1635      }
1636      addListener(
1637        moveRegion(regionInfo, RequestConverter
1638          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1639        (ret, err2) -> {
1640          if (err2 != null) {
1641            future.completeExceptionally(err2);
1642          } else {
1643            future.complete(ret);
1644          }
1645        });
1646    });
1647    return future;
1648  }
1649
1650  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1651    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1652      .action(
1653        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1654          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1655      .call();
1656  }
1657
1658  @Override
1659  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1660    return this
1661        .<Void> newMasterCaller()
1662        .action(
1663          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1664            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1665            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1666  }
1667
1668  @Override
1669  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1670    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1671    Scan scan = QuotaTableUtil.makeScan(filter);
1672    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1673        .scan(scan, new AdvancedScanResultConsumer() {
1674          List<QuotaSettings> settings = new ArrayList<>();
1675
1676          @Override
1677          public void onNext(Result[] results, ScanController controller) {
1678            for (Result result : results) {
1679              try {
1680                QuotaTableUtil.parseResultToCollection(result, settings);
1681              } catch (IOException e) {
1682                controller.terminate();
1683                future.completeExceptionally(e);
1684              }
1685            }
1686          }
1687
1688          @Override
1689          public void onError(Throwable error) {
1690            future.completeExceptionally(error);
1691          }
1692
1693          @Override
1694          public void onComplete() {
1695            future.complete(settings);
1696          }
1697        });
1698    return future;
1699  }
1700
1701  @Override
1702  public CompletableFuture<Void> addReplicationPeer(String peerId,
1703      ReplicationPeerConfig peerConfig, boolean enabled) {
1704    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1705      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1706      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1707      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1708  }
1709
1710  @Override
1711  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1712    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1713      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1714      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1715      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1716  }
1717
1718  @Override
1719  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1720    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1721      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1722      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1723      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1724  }
1725
1726  @Override
1727  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1728    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1729      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1730      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1731      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1732  }
1733
1734  @Override
1735  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1736    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1737      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
1738          call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1739            (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1740            (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1741  }
1742
1743  @Override
1744  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1745      ReplicationPeerConfig peerConfig) {
1746    return this
1747      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1748        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1749        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1750        (resp) -> resp.getProcId(),
1751        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1752  }
1753
1754  @Override
1755  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1756      SyncReplicationState clusterState) {
1757    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1758        TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1759        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1760          clusterState),
1761          (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1762          (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1763            () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1764  }
1765
1766  @Override
1767  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1768      Map<TableName, List<String>> tableCfs) {
1769    if (tableCfs == null) {
1770      return failedFuture(new ReplicationException("tableCfs is null"));
1771    }
1772
1773    CompletableFuture<Void> future = new CompletableFuture<Void>();
1774    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1775      if (!completeExceptionally(future, error)) {
1776        ReplicationPeerConfig newPeerConfig =
1777          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1778        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1779          if (!completeExceptionally(future, error)) {
1780            future.complete(result);
1781          }
1782        });
1783      }
1784    });
1785    return future;
1786  }
1787
1788  @Override
1789  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1790      Map<TableName, List<String>> tableCfs) {
1791    if (tableCfs == null) {
1792      return failedFuture(new ReplicationException("tableCfs is null"));
1793    }
1794
1795    CompletableFuture<Void> future = new CompletableFuture<Void>();
1796    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1797      if (!completeExceptionally(future, error)) {
1798        ReplicationPeerConfig newPeerConfig = null;
1799        try {
1800          newPeerConfig = ReplicationPeerConfigUtil
1801            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1802        } catch (ReplicationException e) {
1803          future.completeExceptionally(e);
1804          return;
1805        }
1806        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1807          if (!completeExceptionally(future, error)) {
1808            future.complete(result);
1809          }
1810        });
1811      }
1812    });
1813    return future;
1814  }
1815
1816  @Override
1817  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1818    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1819  }
1820
1821  @Override
1822  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1823    Preconditions.checkNotNull(pattern,
1824      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1825    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1826  }
1827
1828  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1829      ListReplicationPeersRequest request) {
1830    return this
1831        .<List<ReplicationPeerDescription>> newMasterCaller()
1832        .action(
1833          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1834              List<ReplicationPeerDescription>> call(controller, stub, request,
1835                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1836                (resp) -> resp.getPeerDescList().stream()
1837                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1838                    .collect(Collectors.toList()))).call();
1839  }
1840
1841  @Override
1842  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1843    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1844    addListener(listTableDescriptors(), (tables, error) -> {
1845      if (!completeExceptionally(future, error)) {
1846        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1847        tables.forEach(table -> {
1848          Map<String, Integer> cfs = new HashMap<>();
1849          Stream.of(table.getColumnFamilies())
1850            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1851            .forEach(column -> {
1852              cfs.put(column.getNameAsString(), column.getScope());
1853            });
1854          if (!cfs.isEmpty()) {
1855            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1856          }
1857        });
1858        future.complete(replicatedTableCFs);
1859      }
1860    });
1861    return future;
1862  }
1863
1864  @Override
1865  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1866    SnapshotProtos.SnapshotDescription snapshot =
1867      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1868    try {
1869      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1870    } catch (IllegalArgumentException e) {
1871      return failedFuture(e);
1872    }
1873    CompletableFuture<Void> future = new CompletableFuture<>();
1874    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1875    addListener(this.<Long> newMasterCaller()
1876      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1877        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1878        resp -> resp.getExpectedTimeout()))
1879      .call(), (expectedTimeout, err) -> {
1880        if (err != null) {
1881          future.completeExceptionally(err);
1882          return;
1883        }
1884        TimerTask pollingTask = new TimerTask() {
1885          int tries = 0;
1886          long startTime = EnvironmentEdgeManager.currentTime();
1887          long endTime = startTime + expectedTimeout;
1888          long maxPauseTime = expectedTimeout / maxAttempts;
1889
1890          @Override
1891          public void run(Timeout timeout) throws Exception {
1892            if (EnvironmentEdgeManager.currentTime() < endTime) {
1893              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1894                if (err2 != null) {
1895                  future.completeExceptionally(err2);
1896                } else if (done) {
1897                  future.complete(null);
1898                } else {
1899                  // retry again after pauseTime.
1900                  long pauseTime =
1901                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1902                  pauseTime = Math.min(pauseTime, maxPauseTime);
1903                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1904                    TimeUnit.MILLISECONDS);
1905                }
1906              });
1907            } else {
1908              future.completeExceptionally(
1909                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1910                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1911            }
1912          }
1913        };
1914        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1915      });
1916    return future;
1917  }
1918
1919  @Override
1920  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1921    return this
1922        .<Boolean> newMasterCaller()
1923        .action(
1924          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1925            controller,
1926            stub,
1927            IsSnapshotDoneRequest.newBuilder()
1928                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1929                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1930  }
1931
1932  @Override
1933  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1934    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1935      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1936      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1937    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1938  }
1939
1940  @Override
1941  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1942      boolean restoreAcl) {
1943    CompletableFuture<Void> future = new CompletableFuture<>();
1944    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1945      if (err != null) {
1946        future.completeExceptionally(err);
1947        return;
1948      }
1949      TableName tableName = null;
1950      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1951        for (SnapshotDescription snap : snapshotDescriptions) {
1952          if (snap.getName().equals(snapshotName)) {
1953            tableName = snap.getTableName();
1954            break;
1955          }
1956        }
1957      }
1958      if (tableName == null) {
1959        future.completeExceptionally(new RestoreSnapshotException(
1960          "Unable to find the table name for snapshot=" + snapshotName));
1961        return;
1962      }
1963      final TableName finalTableName = tableName;
1964      addListener(tableExists(finalTableName), (exists, err2) -> {
1965        if (err2 != null) {
1966          future.completeExceptionally(err2);
1967        } else if (!exists) {
1968          // if table does not exist, then just clone snapshot into new table.
1969          completeConditionalOnFuture(future,
1970            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
1971        } else {
1972          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1973            if (err4 != null) {
1974              future.completeExceptionally(err4);
1975            } else if (!disabled) {
1976              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1977            } else {
1978              completeConditionalOnFuture(future,
1979                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1980            }
1981          });
1982        }
1983      });
1984    });
1985    return future;
1986  }
1987
1988  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1989      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1990    if (takeFailSafeSnapshot) {
1991      CompletableFuture<Void> future = new CompletableFuture<>();
1992      // Step.1 Take a snapshot of the current state
1993      String failSafeSnapshotSnapshotNameFormat =
1994        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1995          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1996      final String failSafeSnapshotSnapshotName =
1997        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1998          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1999          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2000      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2001      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2002        if (err != null) {
2003          future.completeExceptionally(err);
2004        } else {
2005          // Step.2 Restore snapshot
2006          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2007            (void2, err2) -> {
2008              if (err2 != null) {
2009                // Step.3.a Something went wrong during the restore and try to rollback.
2010                addListener(
2011                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl,
2012                    null),
2013                  (void3, err3) -> {
2014                    if (err3 != null) {
2015                      future.completeExceptionally(err3);
2016                    } else {
2017                      String msg =
2018                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
2019                          failSafeSnapshotSnapshotName + " succeeded.";
2020                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2021                    }
2022                  });
2023              } else {
2024                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2025                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2026                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2027                  if (err3 != null) {
2028                    LOG.error(
2029                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2030                      err3);
2031                  }
2032                  future.complete(ret3);
2033                });
2034              }
2035            });
2036        }
2037      });
2038      return future;
2039    } else {
2040      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2041    }
2042  }
2043
2044  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2045      CompletableFuture<T> parentFuture) {
2046    addListener(parentFuture, (res, err) -> {
2047      if (err != null) {
2048        dependentFuture.completeExceptionally(err);
2049      } else {
2050        dependentFuture.complete(res);
2051      }
2052    });
2053  }
2054
2055  @Override
2056  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2057      boolean restoreAcl, String customSFT) {
2058    CompletableFuture<Void> future = new CompletableFuture<>();
2059    addListener(tableExists(tableName), (exists, err) -> {
2060      if (err != null) {
2061        future.completeExceptionally(err);
2062      } else if (exists) {
2063        future.completeExceptionally(new TableExistsException(tableName));
2064      } else {
2065        completeConditionalOnFuture(future,
2066          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2067      }
2068    });
2069    return future;
2070  }
2071
2072  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2073      boolean restoreAcl, String customSFT) {
2074    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2075      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2076    try {
2077      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2078    } catch (IllegalArgumentException e) {
2079      return failedFuture(e);
2080    }
2081    RestoreSnapshotRequest.Builder builder =
2082      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2083        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2084    if(customSFT != null){
2085      builder.setCustomSFT(customSFT);
2086    }
2087    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2088      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2089        builder.build(),
2090        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2091      .call());
2092  }
2093
2094  @Override
2095  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2096    return getCompletedSnapshots(null);
2097  }
2098
2099  @Override
2100  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2101    Preconditions.checkNotNull(pattern,
2102      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2103    return getCompletedSnapshots(pattern);
2104  }
2105
2106  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2107    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2108        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2109        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2110          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2111          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2112        .call();
2113  }
2114
2115  @Override
2116  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2117    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2118        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2119    return getCompletedSnapshots(tableNamePattern, null);
2120  }
2121
2122  @Override
2123  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2124      Pattern snapshotNamePattern) {
2125    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2126        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2127    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2128        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2129    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2130  }
2131
2132  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2133      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2134    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2135    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2136      if (err != null) {
2137        future.completeExceptionally(err);
2138        return;
2139      }
2140      if (tableNames == null || tableNames.size() <= 0) {
2141        future.complete(Collections.emptyList());
2142        return;
2143      }
2144      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2145        if (err2 != null) {
2146          future.completeExceptionally(err2);
2147          return;
2148        }
2149        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2150          future.complete(Collections.emptyList());
2151          return;
2152        }
2153        future.complete(snapshotDescList.stream()
2154          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2155          .collect(Collectors.toList()));
2156      });
2157    });
2158    return future;
2159  }
2160
2161  @Override
2162  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2163    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2164  }
2165
2166  @Override
2167  public CompletableFuture<Void> deleteSnapshots() {
2168    return internalDeleteSnapshots(null, null);
2169  }
2170
2171  @Override
2172  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2173    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2174        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2175    return internalDeleteSnapshots(null, snapshotNamePattern);
2176  }
2177
2178  @Override
2179  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2180    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2181        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2182    return internalDeleteSnapshots(tableNamePattern, null);
2183  }
2184
2185  @Override
2186  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2187      Pattern snapshotNamePattern) {
2188    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2189        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2190    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2191        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2192    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2193  }
2194
2195  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2196      Pattern snapshotNamePattern) {
2197    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2198    if (tableNamePattern == null) {
2199      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2200    } else {
2201      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2202    }
2203    CompletableFuture<Void> future = new CompletableFuture<>();
2204    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2205      if (err != null) {
2206        future.completeExceptionally(err);
2207        return;
2208      }
2209      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2210        future.complete(null);
2211        return;
2212      }
2213      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2214        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2215          if (e != null) {
2216            future.completeExceptionally(e);
2217          } else {
2218            future.complete(v);
2219          }
2220        });
2221    }));
2222    return future;
2223  }
2224
2225  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2226    return this
2227        .<Void> newMasterCaller()
2228        .action(
2229          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2230            controller,
2231            stub,
2232            DeleteSnapshotRequest.newBuilder()
2233                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2234                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2235  }
2236
2237  @Override
2238  public CompletableFuture<Void> execProcedure(String signature, String instance,
2239      Map<String, String> props) {
2240    CompletableFuture<Void> future = new CompletableFuture<>();
2241    ProcedureDescription procDesc =
2242      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2243    addListener(this.<Long> newMasterCaller()
2244      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2245        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2246        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2247      .call(), (expectedTimeout, err) -> {
2248        if (err != null) {
2249          future.completeExceptionally(err);
2250          return;
2251        }
2252        TimerTask pollingTask = new TimerTask() {
2253          int tries = 0;
2254          long startTime = EnvironmentEdgeManager.currentTime();
2255          long endTime = startTime + expectedTimeout;
2256          long maxPauseTime = expectedTimeout / maxAttempts;
2257
2258          @Override
2259          public void run(Timeout timeout) throws Exception {
2260            if (EnvironmentEdgeManager.currentTime() < endTime) {
2261              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2262                if (err2 != null) {
2263                  future.completeExceptionally(err2);
2264                  return;
2265                }
2266                if (done) {
2267                  future.complete(null);
2268                } else {
2269                  // retry again after pauseTime.
2270                  long pauseTime =
2271                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2272                  pauseTime = Math.min(pauseTime, maxPauseTime);
2273                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2274                    TimeUnit.MICROSECONDS);
2275                }
2276              });
2277            } else {
2278              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2279                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2280            }
2281          }
2282        };
2283        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2284        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2285      });
2286    return future;
2287  }
2288
2289  @Override
2290  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2291      Map<String, String> props) {
2292    ProcedureDescription proDesc =
2293        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2294    return this.<byte[]> newMasterCaller()
2295        .action(
2296          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2297            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2298            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2299            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2300        .call();
2301  }
2302
2303  @Override
2304  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2305      Map<String, String> props) {
2306    ProcedureDescription proDesc =
2307        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2308    return this.<Boolean> newMasterCaller()
2309        .action((controller, stub) -> this
2310            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2311              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2312              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2313        .call();
2314  }
2315
2316  @Override
2317  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2318    return this.<Boolean> newMasterCaller().action(
2319      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2320        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2321        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2322        .call();
2323  }
2324
2325  @Override
2326  public CompletableFuture<String> getProcedures() {
2327    return this
2328        .<String> newMasterCaller()
2329        .action(
2330          (controller, stub) -> this
2331              .<GetProceduresRequest, GetProceduresResponse, String> call(
2332                controller, stub, GetProceduresRequest.newBuilder().build(),
2333                (s, c, req, done) -> s.getProcedures(c, req, done),
2334                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2335  }
2336
2337  @Override
2338  public CompletableFuture<String> getLocks() {
2339    return this
2340        .<String> newMasterCaller()
2341        .action(
2342          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2343            controller, stub, GetLocksRequest.newBuilder().build(),
2344            (s, c, req, done) -> s.getLocks(c, req, done),
2345            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2346  }
2347
2348  @Override
2349  public CompletableFuture<Void> decommissionRegionServers(
2350      List<ServerName> servers, boolean offload) {
2351    return this.<Void> newMasterCaller()
2352        .action((controller, stub) -> this
2353          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2354            controller, stub,
2355              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2356            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2357        .call();
2358  }
2359
2360  @Override
2361  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2362    return this.<List<ServerName>> newMasterCaller()
2363        .action((controller, stub) -> this
2364          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2365            List<ServerName>> call(
2366              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2367              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2368              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2369                  .collect(Collectors.toList())))
2370        .call();
2371  }
2372
2373  @Override
2374  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2375      List<byte[]> encodedRegionNames) {
2376    return this.<Void> newMasterCaller()
2377        .action((controller, stub) ->
2378            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2379                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2380                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2381                        c, req, done), resp -> null)).call();
2382  }
2383
2384  /**
2385   * Get the region location for the passed region name. The region name may be a full region name
2386   * or encoded region name. If the region does not found, then it'll throw an
2387   * UnknownRegionException wrapped by a {@link CompletableFuture}
2388   * @param regionNameOrEncodedRegionName region name or encoded region name
2389   * @return region location, wrapped by a {@link CompletableFuture}
2390   */
2391  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2392    if (regionNameOrEncodedRegionName == null) {
2393      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2394    }
2395
2396    CompletableFuture<Optional<HRegionLocation>> future;
2397    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2398      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2399      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2400        // old format encodedName, should be meta region
2401        future = connection.registry.getMetaRegionLocations()
2402          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2403            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2404      } else {
2405        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2406          regionNameOrEncodedRegionName);
2407      }
2408    } else {
2409      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2410      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2411      RegionInfo regionInfo;
2412      try {
2413        regionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(
2414          regionNameOrEncodedRegionName);
2415      } catch (IOException ioe) {
2416        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2417      }
2418
2419      if (regionInfo.isMetaRegion()) {
2420        future = connection.registry.getMetaRegionLocations()
2421          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2422            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2423            .findFirst());
2424      } else {
2425        future =
2426          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2427      }
2428    }
2429
2430    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2431    addListener(future, (location, err) -> {
2432      if (err != null) {
2433        returnedFuture.completeExceptionally(err);
2434        return;
2435      }
2436      if (!location.isPresent() || location.get().getRegion() == null) {
2437        returnedFuture.completeExceptionally(
2438          new UnknownRegionException("Invalid region name or encoded region name: " +
2439            Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2440      } else {
2441        returnedFuture.complete(location.get());
2442      }
2443    });
2444    return returnedFuture;
2445  }
2446
2447  /**
2448   * Get the region info for the passed region name. The region name may be a full region name or
2449   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2450   * wrapped by a {@link CompletableFuture}
2451   * @return region info, wrapped by a {@link CompletableFuture}
2452   */
2453  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2454    if (regionNameOrEncodedRegionName == null) {
2455      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2456    }
2457
2458    if (Bytes.equals(regionNameOrEncodedRegionName,
2459      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2460      Bytes.equals(regionNameOrEncodedRegionName,
2461        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2462      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2463    }
2464
2465    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2466    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2467      if (err != null) {
2468        future.completeExceptionally(err);
2469      } else {
2470        future.complete(location.getRegion());
2471      }
2472    });
2473    return future;
2474  }
2475
2476  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2477    if (numRegions < 3) {
2478      throw new IllegalArgumentException("Must create at least three regions");
2479    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2480      throw new IllegalArgumentException("Start key must be smaller than end key");
2481    }
2482    if (numRegions == 3) {
2483      return new byte[][] { startKey, endKey };
2484    }
2485    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2486    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2487      throw new IllegalArgumentException("Unable to split key range into enough regions");
2488    }
2489    return splitKeys;
2490  }
2491
2492  private void verifySplitKeys(byte[][] splitKeys) {
2493    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2494    // Verify there are no duplicate split keys
2495    byte[] lastKey = null;
2496    for (byte[] splitKey : splitKeys) {
2497      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2498        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2499      }
2500      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2501        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2502            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2503      }
2504      lastKey = splitKey;
2505    }
2506  }
2507
2508  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2509
2510    abstract void onFinished();
2511
2512    abstract void onError(Throwable error);
2513
2514    @Override
2515    public void accept(Void v, Throwable error) {
2516      if (error != null) {
2517        onError(error);
2518        return;
2519      }
2520      onFinished();
2521    }
2522  }
2523
2524  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2525    protected final TableName tableName;
2526
2527    TableProcedureBiConsumer(TableName tableName) {
2528      this.tableName = tableName;
2529    }
2530
2531    abstract String getOperationType();
2532
2533    String getDescription() {
2534      return "Operation: " + getOperationType() + ", " + "Table Name: "
2535          + tableName.getNameWithNamespaceInclAsString();
2536    }
2537
2538    @Override
2539    void onFinished() {
2540      LOG.info(getDescription() + " completed");
2541    }
2542
2543    @Override
2544    void onError(Throwable error) {
2545      LOG.info(getDescription() + " failed with " + error.getMessage());
2546    }
2547  }
2548
2549  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2550    protected final String namespaceName;
2551
2552    NamespaceProcedureBiConsumer(String namespaceName) {
2553      this.namespaceName = namespaceName;
2554    }
2555
2556    abstract String getOperationType();
2557
2558    String getDescription() {
2559      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2560    }
2561
2562    @Override
2563    void onFinished() {
2564      LOG.info(getDescription() + " completed");
2565    }
2566
2567    @Override
2568    void onError(Throwable error) {
2569      LOG.info(getDescription() + " failed with " + error.getMessage());
2570    }
2571  }
2572
2573  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2574
2575    CreateTableProcedureBiConsumer(TableName tableName) {
2576      super(tableName);
2577    }
2578
2579    @Override
2580    String getOperationType() {
2581      return "CREATE";
2582    }
2583  }
2584
2585  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2586
2587    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2588      super(tableName);
2589    }
2590
2591    @Override
2592    String getOperationType() {
2593      return "ENABLE";
2594    }
2595  }
2596
2597  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2598
2599    DeleteTableProcedureBiConsumer(TableName tableName) {
2600      super(tableName);
2601    }
2602
2603    @Override
2604    String getOperationType() {
2605      return "DELETE";
2606    }
2607
2608    @Override
2609    void onFinished() {
2610      connection.getLocator().clearCache(this.tableName);
2611      super.onFinished();
2612    }
2613  }
2614
2615  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2616
2617    TruncateTableProcedureBiConsumer(TableName tableName) {
2618      super(tableName);
2619    }
2620
2621    @Override
2622    String getOperationType() {
2623      return "TRUNCATE";
2624    }
2625  }
2626
2627  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2628
2629    EnableTableProcedureBiConsumer(TableName tableName) {
2630      super(tableName);
2631    }
2632
2633    @Override
2634    String getOperationType() {
2635      return "ENABLE";
2636    }
2637  }
2638
2639  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2640
2641    DisableTableProcedureBiConsumer(TableName tableName) {
2642      super(tableName);
2643    }
2644
2645    @Override
2646    String getOperationType() {
2647      return "DISABLE";
2648    }
2649  }
2650
2651  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2652
2653    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2654      super(tableName);
2655    }
2656
2657    @Override
2658    String getOperationType() {
2659      return "ADD_COLUMN_FAMILY";
2660    }
2661  }
2662
2663  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2664
2665    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2666      super(tableName);
2667    }
2668
2669    @Override
2670    String getOperationType() {
2671      return "DELETE_COLUMN_FAMILY";
2672    }
2673  }
2674
2675  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2676
2677    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2678      super(tableName);
2679    }
2680
2681    @Override
2682    String getOperationType() {
2683      return "MODIFY_COLUMN_FAMILY";
2684    }
2685  }
2686
2687  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2688
2689    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2690      super(namespaceName);
2691    }
2692
2693    @Override
2694    String getOperationType() {
2695      return "CREATE_NAMESPACE";
2696    }
2697  }
2698
2699  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2700
2701    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2702      super(namespaceName);
2703    }
2704
2705    @Override
2706    String getOperationType() {
2707      return "DELETE_NAMESPACE";
2708    }
2709  }
2710
2711  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2712
2713    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2714      super(namespaceName);
2715    }
2716
2717    @Override
2718    String getOperationType() {
2719      return "MODIFY_NAMESPACE";
2720    }
2721  }
2722
2723  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2724
2725    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2726      super(tableName);
2727    }
2728
2729    @Override
2730    String getOperationType() {
2731      return "MERGE_REGIONS";
2732    }
2733  }
2734
2735  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2736
2737    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2738      super(tableName);
2739    }
2740
2741    @Override
2742    String getOperationType() {
2743      return "SPLIT_REGION";
2744    }
2745  }
2746
2747  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2748    private final String peerId;
2749    private final Supplier<String> getOperation;
2750
2751    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2752      this.peerId = peerId;
2753      this.getOperation = getOperation;
2754    }
2755
2756    String getDescription() {
2757      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2758    }
2759
2760    @Override
2761    void onFinished() {
2762      LOG.info(getDescription() + " completed");
2763    }
2764
2765    @Override
2766    void onError(Throwable error) {
2767      LOG.info(getDescription() + " failed with " + error.getMessage());
2768    }
2769  }
2770
2771  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2772    CompletableFuture<Void> future = new CompletableFuture<>();
2773    addListener(procFuture, (procId, error) -> {
2774      if (error != null) {
2775        future.completeExceptionally(error);
2776        return;
2777      }
2778      getProcedureResult(procId, future, 0);
2779    });
2780    return future;
2781  }
2782
2783  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2784    addListener(
2785      this.<GetProcedureResultResponse> newMasterCaller()
2786        .action((controller, stub) -> this
2787          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2788            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2789            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2790        .call(),
2791      (response, error) -> {
2792        if (error != null) {
2793          LOG.warn("failed to get the procedure result procId={}", procId,
2794            ConnectionUtils.translateException(error));
2795          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2796            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2797          return;
2798        }
2799        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2800          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2801            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2802          return;
2803        }
2804        if (response.hasException()) {
2805          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2806          future.completeExceptionally(ioe);
2807        } else {
2808          future.complete(null);
2809        }
2810      });
2811  }
2812
2813  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2814    CompletableFuture<T> future = new CompletableFuture<>();
2815    future.completeExceptionally(error);
2816    return future;
2817  }
2818
2819  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2820    if (error != null) {
2821      future.completeExceptionally(error);
2822      return true;
2823    }
2824    return false;
2825  }
2826
2827  @Override
2828  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2829    return getClusterMetrics(EnumSet.allOf(Option.class));
2830  }
2831
2832  @Override
2833  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2834    return this
2835        .<ClusterMetrics> newMasterCaller()
2836        .action(
2837          (controller, stub) -> this
2838              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2839                stub, RequestConverter.buildGetClusterStatusRequest(options),
2840                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2841                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2842  }
2843
2844  @Override
2845  public CompletableFuture<Void> shutdown() {
2846    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2847      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2848        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2849        resp -> null))
2850      .call();
2851  }
2852
2853  @Override
2854  public CompletableFuture<Void> stopMaster() {
2855    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2856      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2857        controller, stub, StopMasterRequest.newBuilder().build(),
2858        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2859      .call();
2860  }
2861
2862  @Override
2863  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2864    StopServerRequest request = RequestConverter
2865      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2866    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2867      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2868        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2869        resp -> null))
2870      .serverName(serverName).call();
2871  }
2872
2873  @Override
2874  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2875    return this
2876        .<Void> newAdminCaller()
2877        .action(
2878          (controller, stub) -> this
2879              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2880                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2881                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2882        .serverName(serverName).call();
2883  }
2884
2885  @Override
2886  public CompletableFuture<Void> updateConfiguration() {
2887    CompletableFuture<Void> future = new CompletableFuture<Void>();
2888    addListener(
2889      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2890      (status, err) -> {
2891        if (err != null) {
2892          future.completeExceptionally(err);
2893        } else {
2894          List<CompletableFuture<Void>> futures = new ArrayList<>();
2895          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2896          futures.add(updateConfiguration(status.getMasterName()));
2897          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2898          addListener(
2899            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2900            (result, err2) -> {
2901              if (err2 != null) {
2902                future.completeExceptionally(err2);
2903              } else {
2904                future.complete(result);
2905              }
2906            });
2907        }
2908      });
2909    return future;
2910  }
2911
2912  @Override
2913  public CompletableFuture<Void> updateConfiguration(String groupName) {
2914    CompletableFuture<Void> future = new CompletableFuture<Void>();
2915    addListener(
2916      getRSGroup(groupName),
2917      (rsGroupInfo, err) -> {
2918        if (err != null) {
2919          future.completeExceptionally(err);
2920        } else if (rsGroupInfo == null) {
2921          future.completeExceptionally(
2922            new IllegalArgumentException("Group does not exist: " + groupName));
2923        } else {
2924          addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
2925            if (err2 != null) {
2926              future.completeExceptionally(err2);
2927            } else {
2928              List<CompletableFuture<Void>> futures = new ArrayList<>();
2929              List<ServerName> groupServers = status.getServersName().stream().filter(
2930                s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
2931              groupServers.forEach(server -> futures.add(updateConfiguration(server)));
2932              addListener(
2933                CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2934                (result, err3) -> {
2935                  if (err3 != null) {
2936                    future.completeExceptionally(err3);
2937                  } else {
2938                    future.complete(result);
2939                  }
2940                });
2941            }
2942          });
2943        }
2944      });
2945    return future;
2946  }
2947
2948  @Override
2949  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2950    return this
2951        .<Void> newAdminCaller()
2952        .action(
2953          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2954            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2955            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2956        .serverName(serverName).call();
2957  }
2958
2959  @Override
2960  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2961    return this
2962        .<Void> newAdminCaller()
2963        .action(
2964          (controller, stub) -> this
2965              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2966                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2967                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2968        .serverName(serverName).call();
2969  }
2970
2971  @Override
2972  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2973    return this
2974        .<List<SecurityCapability>> newMasterCaller()
2975        .action(
2976          (controller, stub) -> this
2977              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2978                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2979                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2980                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2981        .call();
2982  }
2983
2984  @Override
2985  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2986    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2987  }
2988
2989  @Override
2990  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2991      TableName tableName) {
2992    Preconditions.checkNotNull(tableName,
2993      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2994    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2995  }
2996
2997  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2998      ServerName serverName) {
2999    return this.<List<RegionMetrics>> newAdminCaller()
3000        .action((controller, stub) -> this
3001            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
3002              adminCall(controller, stub, request, (s, c, req, done) ->
3003                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
3004        .serverName(serverName).call();
3005  }
3006
3007  @Override
3008  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3009    return this
3010        .<Boolean> newMasterCaller()
3011        .action(
3012          (controller, stub) -> this
3013              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
3014                stub, IsInMaintenanceModeRequest.newBuilder().build(),
3015                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3016                resp -> resp.getInMaintenanceMode())).call();
3017  }
3018
3019  @Override
3020  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3021      CompactType compactType) {
3022    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3023
3024    switch (compactType) {
3025      case MOB:
3026        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3027          if (err != null) {
3028            future.completeExceptionally(err);
3029            return;
3030          }
3031          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3032
3033          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3034            .action((controller, stub) -> this
3035              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3036                controller, stub,
3037                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3038                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3039            .call(), (resp2, err2) -> {
3040              if (err2 != null) {
3041                future.completeExceptionally(err2);
3042              } else {
3043                if (resp2.hasCompactionState()) {
3044                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3045                } else {
3046                  future.complete(CompactionState.NONE);
3047                }
3048              }
3049            });
3050        });
3051        break;
3052      case NORMAL:
3053        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3054          if (err != null) {
3055            future.completeExceptionally(err);
3056            return;
3057          }
3058          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3059          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3060          locations.stream().filter(loc -> loc.getServerName() != null)
3061            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3062            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3063              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3064                // If any region compaction state is MAJOR_AND_MINOR
3065                // the table compaction state is MAJOR_AND_MINOR, too.
3066                if (err2 != null) {
3067                  future.completeExceptionally(unwrapCompletionException(err2));
3068                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3069                  future.complete(regionState);
3070                } else {
3071                  regionStates.add(regionState);
3072                }
3073              }));
3074            });
3075          addListener(
3076            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3077            (ret, err3) -> {
3078              // If future not completed, check all regions's compaction state
3079              if (!future.isCompletedExceptionally() && !future.isDone()) {
3080                CompactionState state = CompactionState.NONE;
3081                for (CompactionState regionState : regionStates) {
3082                  switch (regionState) {
3083                    case MAJOR:
3084                      if (state == CompactionState.MINOR) {
3085                        future.complete(CompactionState.MAJOR_AND_MINOR);
3086                      } else {
3087                        state = CompactionState.MAJOR;
3088                      }
3089                      break;
3090                    case MINOR:
3091                      if (state == CompactionState.MAJOR) {
3092                        future.complete(CompactionState.MAJOR_AND_MINOR);
3093                      } else {
3094                        state = CompactionState.MINOR;
3095                      }
3096                      break;
3097                    case NONE:
3098                    default:
3099                  }
3100                }
3101                if (!future.isDone()) {
3102                  future.complete(state);
3103                }
3104              }
3105            });
3106        });
3107        break;
3108      default:
3109        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3110    }
3111
3112    return future;
3113  }
3114
3115  @Override
3116  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3117    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3118    addListener(getRegionLocation(regionName), (location, err) -> {
3119      if (err != null) {
3120        future.completeExceptionally(err);
3121        return;
3122      }
3123      ServerName serverName = location.getServerName();
3124      if (serverName == null) {
3125        future
3126          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3127        return;
3128      }
3129      addListener(
3130        this.<GetRegionInfoResponse> newAdminCaller()
3131          .action((controller, stub) -> this
3132            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3133              controller, stub,
3134              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3135                true),
3136              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3137          .serverName(serverName).call(),
3138        (resp2, err2) -> {
3139          if (err2 != null) {
3140            future.completeExceptionally(err2);
3141          } else {
3142            if (resp2.hasCompactionState()) {
3143              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3144            } else {
3145              future.complete(CompactionState.NONE);
3146            }
3147          }
3148        });
3149    });
3150    return future;
3151  }
3152
3153  @Override
3154  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3155    MajorCompactionTimestampRequest request =
3156        MajorCompactionTimestampRequest.newBuilder()
3157            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3158    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3159        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3160            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3161                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3162  }
3163
3164  @Override
3165  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3166      byte[] regionName) {
3167    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3168    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3169    addListener(getRegionInfo(regionName), (region, err) -> {
3170      if (err != null) {
3171        future.completeExceptionally(err);
3172        return;
3173      }
3174      MajorCompactionTimestampForRegionRequest.Builder builder =
3175        MajorCompactionTimestampForRegionRequest.newBuilder();
3176      builder.setRegion(
3177        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3178      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3179        .<MajorCompactionTimestampForRegionRequest,
3180        MajorCompactionTimestampResponse, Optional<Long>> call(
3181          controller, stub, builder.build(),
3182          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3183          ProtobufUtil::toOptionalTimestamp))
3184        .call(), (timestamp, err2) -> {
3185          if (err2 != null) {
3186            future.completeExceptionally(err2);
3187          } else {
3188            future.complete(timestamp);
3189          }
3190        });
3191    });
3192    return future;
3193  }
3194
3195  @Override
3196  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3197      List<String> serverNamesList) {
3198    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3199    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3200      if (err != null) {
3201        future.completeExceptionally(err);
3202        return;
3203      }
3204      // Accessed by multiple threads.
3205      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3206      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3207      serverNames.stream().forEach(serverName -> {
3208        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3209          if (err2 != null) {
3210            future.completeExceptionally(unwrapCompletionException(err2));
3211          } else {
3212            serverStates.put(serverName, serverState);
3213          }
3214        }));
3215      });
3216      addListener(
3217        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3218        (ret, err3) -> {
3219          if (!future.isCompletedExceptionally()) {
3220            if (err3 != null) {
3221              future.completeExceptionally(err3);
3222            } else {
3223              future.complete(serverStates);
3224            }
3225          }
3226        });
3227    });
3228    return future;
3229  }
3230
3231  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3232    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3233    if (serverNamesList.isEmpty()) {
3234      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3235        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3236      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3237        if (err != null) {
3238          future.completeExceptionally(err);
3239        } else {
3240          future.complete(clusterMetrics.getServersName());
3241        }
3242      });
3243      return future;
3244    } else {
3245      List<ServerName> serverList = new ArrayList<>();
3246      for (String regionServerName : serverNamesList) {
3247        ServerName serverName = null;
3248        try {
3249          serverName = ServerName.valueOf(regionServerName);
3250        } catch (Exception e) {
3251          future.completeExceptionally(
3252            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3253        }
3254        if (serverName == null) {
3255          future.completeExceptionally(
3256            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3257        } else {
3258          serverList.add(serverName);
3259        }
3260      }
3261      future.complete(serverList);
3262    }
3263    return future;
3264  }
3265
3266  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3267    return this
3268        .<Boolean>newAdminCaller()
3269        .serverName(serverName)
3270        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3271            Boolean>adminCall(controller, stub,
3272            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3273            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3274  }
3275
3276  @Override
3277  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3278    return this.<Boolean> newMasterCaller()
3279      .action((controller, stub) -> this
3280        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3281          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3282          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3283          (resp) -> resp.getPrevBalanceValue()))
3284      .call();
3285  }
3286
3287  @Override
3288  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3289    return this
3290        .<BalanceResponse> newMasterCaller()
3291        .action(
3292          (controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, BalanceResponse> call(controller,
3293            stub, ProtobufUtil.toBalanceRequest(request),
3294            (s, c, req, done) -> s.balance(c, req, done), (resp) -> ProtobufUtil.toBalanceResponse(resp))).call();
3295  }
3296
3297
3298  @Override
3299  public CompletableFuture<Boolean> isBalancerEnabled() {
3300    return this
3301        .<Boolean> newMasterCaller()
3302        .action((controller, stub) ->
3303              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3304            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3305                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3306  }
3307
3308  @Override
3309  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3310    return this
3311        .<Boolean> newMasterCaller()
3312        .action(
3313          (controller, stub) -> this
3314              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3315                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3316                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3317                    .getPrevNormalizerValue())).call();
3318  }
3319
3320  @Override
3321  public CompletableFuture<Boolean> isNormalizerEnabled() {
3322    return this
3323        .<Boolean> newMasterCaller()
3324        .action(
3325          (controller, stub) -> this
3326              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3327                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3328                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3329                (resp) -> resp.getEnabled())).call();
3330  }
3331
3332  @Override
3333  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3334    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3335  }
3336
3337  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3338    return this
3339      .<Boolean> newMasterCaller()
3340      .action(
3341        (controller, stub) -> this.call(
3342          controller, stub, request, MasterService.Interface::normalize,
3343          NormalizeResponse::getNormalizerRan))
3344      .call();
3345  }
3346
3347  @Override
3348  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3349    return this
3350        .<Boolean> newMasterCaller()
3351        .action(
3352          (controller, stub) -> this
3353              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3354                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3355                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3356                    .getPrevValue())).call();
3357  }
3358
3359  @Override
3360  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3361    return this
3362        .<Boolean> newMasterCaller()
3363        .action(
3364          (controller, stub) -> this
3365              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3366                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3367                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3368        .call();
3369  }
3370
3371  @Override
3372  public CompletableFuture<Boolean> runCleanerChore() {
3373    return this
3374        .<Boolean> newMasterCaller()
3375        .action(
3376          (controller, stub) -> this
3377              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3378                RequestConverter.buildRunCleanerChoreRequest(),
3379                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3380                (resp) -> resp.getCleanerChoreRan())).call();
3381  }
3382
3383  @Override
3384  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3385    return this
3386        .<Boolean> newMasterCaller()
3387        .action(
3388          (controller, stub) -> this
3389              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3390                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3391                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3392                    .getPrevValue())).call();
3393  }
3394
3395  @Override
3396  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3397    return this
3398        .<Boolean> newMasterCaller()
3399        .action(
3400          (controller, stub) -> this
3401              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3402                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3403                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3404                    .getValue())).call();
3405  }
3406
3407  @Override
3408  public CompletableFuture<Integer> runCatalogJanitor() {
3409    return this
3410        .<Integer> newMasterCaller()
3411        .action(
3412          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3413            controller, stub, RequestConverter.buildCatalogScanRequest(),
3414            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3415        .call();
3416  }
3417
3418  @Override
3419  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3420      ServiceCaller<S, R> callable) {
3421    MasterCoprocessorRpcChannelImpl channel =
3422        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3423    S stub = stubMaker.apply(channel);
3424    CompletableFuture<R> future = new CompletableFuture<>();
3425    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3426    callable.call(stub, controller, resp -> {
3427      if (controller.failed()) {
3428        future.completeExceptionally(controller.getFailed());
3429      } else {
3430        future.complete(resp);
3431      }
3432    });
3433    return future;
3434  }
3435
3436  @Override
3437  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3438      ServiceCaller<S, R> callable, ServerName serverName) {
3439    RegionServerCoprocessorRpcChannelImpl channel =
3440        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3441          serverName));
3442    S stub = stubMaker.apply(channel);
3443    CompletableFuture<R> future = new CompletableFuture<>();
3444    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3445    callable.call(stub, controller, resp -> {
3446      if (controller.failed()) {
3447        future.completeExceptionally(controller.getFailed());
3448      } else {
3449        future.complete(resp);
3450      }
3451    });
3452    return future;
3453  }
3454
3455  @Override
3456  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3457    return this.<List<ServerName>> newMasterCaller()
3458      .action((controller, stub) -> this
3459        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3460          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3461          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3462          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3463      .call();
3464  }
3465
3466  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3467    return this.connection.callerFactory.<T> serverRequest()
3468      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3469      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3470      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3471      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3472  }
3473
3474  @Override
3475  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3476    if (tableName == null) {
3477      return failedFuture(new IllegalArgumentException("Table name is null"));
3478    }
3479    CompletableFuture<Void> future = new CompletableFuture<>();
3480    addListener(tableExists(tableName), (exist, err) -> {
3481      if (err != null) {
3482        future.completeExceptionally(err);
3483        return;
3484      }
3485      if (!exist) {
3486        future.completeExceptionally(new TableNotFoundException(
3487          "Table '" + tableName.getNameAsString() + "' does not exists."));
3488        return;
3489      }
3490      addListener(getTableSplits(tableName), (splits, err1) -> {
3491        if (err1 != null) {
3492          future.completeExceptionally(err1);
3493        } else {
3494          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3495            if (err2 != null) {
3496              future.completeExceptionally(err2);
3497            } else {
3498              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3499                if (err3 != null) {
3500                  future.completeExceptionally(err3);
3501                } else {
3502                  future.complete(result3);
3503                }
3504              });
3505            }
3506          });
3507        }
3508      });
3509    });
3510    return future;
3511  }
3512
3513  @Override
3514  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3515    if (tableName == null) {
3516      return failedFuture(new IllegalArgumentException("Table name is null"));
3517    }
3518    CompletableFuture<Void> future = new CompletableFuture<>();
3519    addListener(tableExists(tableName), (exist, err) -> {
3520      if (err != null) {
3521        future.completeExceptionally(err);
3522        return;
3523      }
3524      if (!exist) {
3525        future.completeExceptionally(new TableNotFoundException(
3526          "Table '" + tableName.getNameAsString() + "' does not exists."));
3527        return;
3528      }
3529      addListener(setTableReplication(tableName, false), (result, err2) -> {
3530        if (err2 != null) {
3531          future.completeExceptionally(err2);
3532        } else {
3533          future.complete(result);
3534        }
3535      });
3536    });
3537    return future;
3538  }
3539
3540  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3541    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3542    addListener(
3543      getRegions(tableName).thenApply(regions -> regions.stream()
3544        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3545      (regions, err2) -> {
3546        if (err2 != null) {
3547          future.completeExceptionally(err2);
3548          return;
3549        }
3550        if (regions.size() == 1) {
3551          future.complete(null);
3552        } else {
3553          byte[][] splits = new byte[regions.size() - 1][];
3554          for (int i = 1; i < regions.size(); i++) {
3555            splits[i - 1] = regions.get(i).getStartKey();
3556          }
3557          future.complete(splits);
3558        }
3559      });
3560    return future;
3561  }
3562
3563  /**
3564   * Connect to peer and check the table descriptor on peer:
3565   * <ol>
3566   * <li>Create the same table on peer when not exist.</li>
3567   * <li>Throw an exception if the table already has replication enabled on any of the column
3568   * families.</li>
3569   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3570   * </ol>
3571   * @param tableName name of the table to sync to the peer
3572   * @param splits table split keys
3573   */
3574  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3575      byte[][] splits) {
3576    CompletableFuture<Void> future = new CompletableFuture<>();
3577    addListener(listReplicationPeers(), (peers, err) -> {
3578      if (err != null) {
3579        future.completeExceptionally(err);
3580        return;
3581      }
3582      if (peers == null || peers.size() <= 0) {
3583        future.completeExceptionally(
3584          new IllegalArgumentException("Found no peer cluster for replication."));
3585        return;
3586      }
3587      List<CompletableFuture<Void>> futures = new ArrayList<>();
3588      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3589        .forEach(peer -> {
3590          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3591        });
3592      addListener(
3593        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3594        (result, err2) -> {
3595          if (err2 != null) {
3596            future.completeExceptionally(err2);
3597          } else {
3598            future.complete(result);
3599          }
3600        });
3601    });
3602    return future;
3603  }
3604
3605  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3606      ReplicationPeerDescription peer) {
3607    Configuration peerConf = null;
3608    try {
3609      peerConf =
3610        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3611    } catch (IOException e) {
3612      return failedFuture(e);
3613    }
3614    CompletableFuture<Void> future = new CompletableFuture<>();
3615    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3616      if (err != null) {
3617        future.completeExceptionally(err);
3618        return;
3619      }
3620      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3621        if (err1 != null) {
3622          future.completeExceptionally(err1);
3623          return;
3624        }
3625        AsyncAdmin peerAdmin = conn.getAdmin();
3626        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3627          if (err2 != null) {
3628            future.completeExceptionally(err2);
3629            return;
3630          }
3631          if (!exist) {
3632            CompletableFuture<Void> createTableFuture = null;
3633            if (splits == null) {
3634              createTableFuture = peerAdmin.createTable(tableDesc);
3635            } else {
3636              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3637            }
3638            addListener(createTableFuture, (result, err3) -> {
3639              if (err3 != null) {
3640                future.completeExceptionally(err3);
3641              } else {
3642                future.complete(result);
3643              }
3644            });
3645          } else {
3646            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3647              (result, err4) -> {
3648                if (err4 != null) {
3649                  future.completeExceptionally(err4);
3650                } else {
3651                  future.complete(result);
3652                }
3653              });
3654          }
3655        });
3656      });
3657    });
3658    return future;
3659  }
3660
3661  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3662      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3663    CompletableFuture<Void> future = new CompletableFuture<>();
3664    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3665      if (err != null) {
3666        future.completeExceptionally(err);
3667        return;
3668      }
3669      if (peerTableDesc == null) {
3670        future.completeExceptionally(
3671          new IllegalArgumentException("Failed to get table descriptor for table " +
3672            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3673        return;
3674      }
3675      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3676        future.completeExceptionally(new IllegalArgumentException(
3677          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3678            ", but the table descriptors are not same when compared with source cluster." +
3679            " Thus can not enable the table's replication switch."));
3680        return;
3681      }
3682      future.complete(null);
3683    });
3684    return future;
3685  }
3686
3687  /**
3688   * Set the table's replication switch if the table's replication switch is already not set.
3689   * @param tableName name of the table
3690   * @param enableRep is replication switch enable or disable
3691   */
3692  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3693    CompletableFuture<Void> future = new CompletableFuture<>();
3694    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3695      if (err != null) {
3696        future.completeExceptionally(err);
3697        return;
3698      }
3699      if (!tableDesc.matchReplicationScope(enableRep)) {
3700        int scope =
3701          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3702        TableDescriptor newTableDesc =
3703          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3704        addListener(modifyTable(newTableDesc), (result, err2) -> {
3705          if (err2 != null) {
3706            future.completeExceptionally(err2);
3707          } else {
3708            future.complete(result);
3709          }
3710        });
3711      } else {
3712        future.complete(null);
3713      }
3714    });
3715    return future;
3716  }
3717
3718  @Override
3719  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3720    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3721    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3722      if (err != null) {
3723        future.completeExceptionally(err);
3724        return;
3725      }
3726      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3727        locations.stream().filter(l -> l.getRegion() != null)
3728          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3729          .collect(Collectors.groupingBy(l -> l.getServerName(),
3730            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3731      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3732      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3733      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3734        futures
3735          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3736            if (err2 != null) {
3737              future.completeExceptionally(unwrapCompletionException(err2));
3738            } else {
3739              aggregator.append(stats);
3740            }
3741          }));
3742      }
3743      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3744        (ret, err3) -> {
3745          if (err3 != null) {
3746            future.completeExceptionally(unwrapCompletionException(err3));
3747          } else {
3748            future.complete(aggregator.sum());
3749          }
3750        });
3751    });
3752    return future;
3753  }
3754
3755  @Override
3756  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3757      boolean preserveSplits) {
3758    CompletableFuture<Void> future = new CompletableFuture<>();
3759    addListener(tableExists(tableName), (exist, err) -> {
3760      if (err != null) {
3761        future.completeExceptionally(err);
3762        return;
3763      }
3764      if (!exist) {
3765        future.completeExceptionally(new TableNotFoundException(tableName));
3766        return;
3767      }
3768      addListener(tableExists(newTableName), (exist1, err1) -> {
3769        if (err1 != null) {
3770          future.completeExceptionally(err1);
3771          return;
3772        }
3773        if (exist1) {
3774          future.completeExceptionally(new TableExistsException(newTableName));
3775          return;
3776        }
3777        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3778          if (err2 != null) {
3779            future.completeExceptionally(err2);
3780            return;
3781          }
3782          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3783          if (preserveSplits) {
3784            addListener(getTableSplits(tableName), (splits, err3) -> {
3785              if (err3 != null) {
3786                future.completeExceptionally(err3);
3787              } else {
3788                addListener(
3789                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3790                  (result, err4) -> {
3791                    if (err4 != null) {
3792                      future.completeExceptionally(err4);
3793                    } else {
3794                      future.complete(result);
3795                    }
3796                  });
3797              }
3798            });
3799          } else {
3800            addListener(createTable(newTableDesc), (result, err5) -> {
3801              if (err5 != null) {
3802                future.completeExceptionally(err5);
3803              } else {
3804                future.complete(result);
3805              }
3806            });
3807          }
3808        });
3809      });
3810    });
3811    return future;
3812  }
3813
3814  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3815      List<RegionInfo> hris) {
3816    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3817      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3818        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3819        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3820        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3821      .serverName(serverName).call();
3822  }
3823
3824  @Override
3825  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3826    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3827        .action((controller, stub) -> this
3828            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3829              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3830              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3831              resp -> resp.getPreviousRpcThrottleEnabled()))
3832        .call();
3833    return future;
3834  }
3835
3836  @Override
3837  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3838    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3839        .action((controller, stub) -> this
3840            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3841              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3842              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3843              resp -> resp.getRpcThrottleEnabled()))
3844        .call();
3845    return future;
3846  }
3847
3848  @Override
3849  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3850    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3851        .action((controller, stub) -> this
3852            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3853              controller, stub,
3854              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3855                  .build(),
3856              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3857              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3858        .call();
3859    return future;
3860  }
3861
3862  @Override
3863  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3864    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3865      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3866      Map<TableName, Long>> call(controller, stub,
3867        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3868        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3869        resp -> resp.getSizesList().stream().collect(Collectors
3870          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3871      .call();
3872  }
3873
3874  @Override
3875  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3876      ServerName serverName) {
3877    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3878      .action((controller, stub) -> this
3879        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3880        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3881          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3882          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3883          resp -> resp.getSnapshotsList().stream()
3884            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3885              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3886      .serverName(serverName).call();
3887  }
3888
3889  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3890      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3891    return this.<SpaceQuotaSnapshot> newMasterCaller()
3892      .action((controller, stub) -> this
3893        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3894          RequestConverter.buildGetQuotaStatesRequest(),
3895          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3896      .call();
3897  }
3898
3899  @Override
3900  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3901    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3902      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3903      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3904  }
3905
3906  @Override
3907  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3908    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3909    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3910      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3911      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3912  }
3913
3914  @Override
3915  public CompletableFuture<Void> grant(UserPermission userPermission,
3916      boolean mergeExistingPermissions) {
3917    return this.<Void> newMasterCaller()
3918        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3919          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3920          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3921        .call();
3922  }
3923
3924  @Override
3925  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3926    return this.<Void> newMasterCaller()
3927        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3928          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3929          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3930        .call();
3931  }
3932
3933  @Override
3934  public CompletableFuture<List<UserPermission>>
3935      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3936    return this.<List<UserPermission>> newMasterCaller().action((controller,
3937        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3938            List<UserPermission>> call(controller, stub,
3939              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3940              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3941              resp -> resp.getUserPermissionList().stream()
3942                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3943                .collect(Collectors.toList())))
3944        .call();
3945  }
3946
3947  @Override
3948  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3949      List<Permission> permissions) {
3950    return this.<List<Boolean>> newMasterCaller()
3951        .action((controller, stub) -> this
3952            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3953              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3954              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3955              resp -> resp.getHasUserPermissionList()))
3956        .call();
3957  }
3958
3959  @Override
3960  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3961      final boolean sync) {
3962    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3963        .call(controller, stub, RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3964            MasterService.Interface::switchSnapshotCleanup,
3965            SetSnapshotCleanupResponse::getPrevSnapshotCleanup)).call();
3966  }
3967
3968  @Override
3969  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3970    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3971        .call(controller, stub, RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3972            MasterService.Interface::isSnapshotCleanupEnabled,
3973            IsSnapshotCleanupEnabledResponse::getEnabled)).call();
3974  }
3975
3976  @Override
3977  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
3978    return this.<Void> newMasterCaller()
3979        .action((controller, stub) -> this.
3980            <MoveServersRequest, MoveServersResponse, Void> call(controller, stub,
3981                RequestConverter.buildMoveServersRequest(servers, groupName),
3982              (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
3983        .call();
3984  }
3985
3986  @Override
3987  public CompletableFuture<Void> addRSGroup(String groupName) {
3988    return this.<Void> newMasterCaller()
3989        .action(((controller, stub) -> this.
3990            <AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, stub,
3991                AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3992              (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
3993        .call();
3994  }
3995
3996  @Override
3997  public CompletableFuture<Void> removeRSGroup(String groupName) {
3998    return this.<Void> newMasterCaller()
3999        .action((controller, stub) -> this.
4000            <RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(controller, stub,
4001                RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4002              (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4003        .call();
4004  }
4005
4006  @Override
4007  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4008    BalanceRequest request) {
4009    return this.<BalanceResponse>newMasterCaller().action(
4010        (controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, BalanceResponse>call(
4011          controller, stub, ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4012          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4013      .call();
4014  }
4015
4016  @Override
4017  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4018    return this.<List<RSGroupInfo>> newMasterCaller()
4019        .action((controller, stub) -> this
4020            .<ListRSGroupInfosRequest, ListRSGroupInfosResponse, List<RSGroupInfo>> call(
4021                controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4022              (s, c, req, done) -> s.listRSGroupInfos(c, req, done),
4023              resp -> resp.getRSGroupInfoList().stream()
4024                  .map(r -> ProtobufUtil.toGroupInfo(r))
4025                  .collect(Collectors.toList())))
4026        .call();
4027  }
4028
4029  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4030      final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4031      final String logType) {
4032    if (CollectionUtils.isEmpty(serverNames)) {
4033      return CompletableFuture.completedFuture(Collections.emptyList());
4034    }
4035    return CompletableFuture.supplyAsync(() -> serverNames.stream()
4036      .map((ServerName serverName) ->
4037        getSlowLogResponseFromServer(serverName, filterParams, limit, logType))
4038      .map(CompletableFuture::join)
4039      .flatMap(List::stream)
4040      .collect(Collectors.toList()));
4041  }
4042
4043  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4044      Map<String, Object> filterParams, int limit, String logType) {
4045    return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this
4046      .adminCall(controller, stub,
4047        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4048        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4049      .serverName(serverName).call();
4050  }
4051
4052  @Override
4053  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
4054      @Nullable Set<ServerName> serverNames) {
4055    if (CollectionUtils.isEmpty(serverNames)) {
4056      return CompletableFuture.completedFuture(Collections.emptyList());
4057    }
4058    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
4059      .map(this::clearSlowLogsResponses)
4060      .collect(Collectors.toList());
4061    return convertToFutureOfList(clearSlowLogResponseList);
4062  }
4063
4064  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4065    return this.<Boolean>newAdminCaller()
4066      .action(((controller, stub) -> this
4067        .adminCall(
4068          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
4069          AdminService.Interface::clearSlowLogsResponses,
4070          ProtobufUtil::toClearSlowLogPayload))
4071      ).serverName(serverName).call();
4072  }
4073
4074  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
4075    List<CompletableFuture<T>> futures) {
4076    CompletableFuture<Void> allDoneFuture =
4077      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4078    return allDoneFuture.thenApply(v ->
4079      futures.stream()
4080        .map(CompletableFuture::join)
4081        .collect(Collectors.toList())
4082    );
4083  }
4084
4085  @Override
4086  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4087    return this.<List<TableName>> newMasterCaller()
4088      .action((controller, stub) -> this
4089        .<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, List<TableName>> call(controller,
4090          stub, ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4091          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4092            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4093      .call();
4094  }
4095
4096  @Override
4097  public CompletableFuture<Pair<List<String>, List<TableName>>>
4098    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4099    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4100      .action((controller, stub) -> this
4101        .<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4102          GetConfiguredNamespacesAndTablesInRSGroupResponse,
4103          Pair<List<String>, List<TableName>>> call(controller, stub,
4104          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4105            .build(),
4106            (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4107            resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4108              .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4109      .call();
4110  }
4111
4112  @Override
4113  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4114    return this.<RSGroupInfo> newMasterCaller()
4115      .action(((controller, stub) -> this
4116        .<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, RSGroupInfo> call(
4117          controller, stub,
4118          GetRSGroupInfoOfServerRequest.newBuilder()
4119            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4120              .setPort(hostPort.getPort()).build())
4121            .build(),
4122          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4123          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4124      .call();
4125  }
4126
4127  @Override
4128  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4129    return this.<Void> newMasterCaller()
4130      .action((controller, stub) -> this.
4131            <RemoveServersRequest, RemoveServersResponse, Void> call(controller, stub,
4132                RequestConverter.buildRemoveServersRequest(servers),
4133              (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4134        .call();
4135  }
4136
4137  @Override
4138  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4139    CompletableFuture<Void> future = new CompletableFuture<>();
4140    for (TableName tableName : tables) {
4141      addListener(tableExists(tableName), (exist, err) -> {
4142        if (err != null) {
4143          future.completeExceptionally(err);
4144          return;
4145        }
4146        if (!exist) {
4147          future.completeExceptionally(new TableNotFoundException(tableName));
4148          return;
4149        }
4150      });
4151    }
4152    addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4153      if (err != null) {
4154        future.completeExceptionally(err);
4155        return;
4156      }
4157      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4158        future.complete(null);
4159        return;
4160      }
4161      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4162      for (TableDescriptor td : tableDescriptions) {
4163        newTableDescriptors
4164            .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4165      }
4166      addListener(CompletableFuture.allOf(
4167        newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4168        (v, e) -> {
4169          if (e != null) {
4170            future.completeExceptionally(e);
4171          } else {
4172            future.complete(v);
4173          }
4174        });
4175    }));
4176    return future;
4177  }
4178
4179  @Override
4180  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4181    return this.<RSGroupInfo> newMasterCaller().action(((controller, stub) -> this
4182      .<GetRSGroupInfoOfTableRequest, GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller,
4183        stub,
4184        GetRSGroupInfoOfTableRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(table))
4185          .build(),
4186        (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4187        resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4188      .call();
4189  }
4190
4191  @Override
4192  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4193    return this.<RSGroupInfo> newMasterCaller()
4194      .action(((controller, stub) -> this
4195        .<GetRSGroupInfoRequest, GetRSGroupInfoResponse, RSGroupInfo> call(controller, stub,
4196          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4197          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4198          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4199      .call();
4200  }
4201
4202  @Override
4203  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4204    return this.<Void> newMasterCaller()
4205      .action(
4206        (
4207          (controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4208            controller,
4209            stub,
4210            RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName).setNewRsgroupName(newName)
4211                                .build(),
4212            (s, c, req, done) -> s.renameRSGroup(c, req, done),
4213            resp -> null
4214          )
4215        )
4216      ).call();
4217  }
4218
4219  @Override
4220  public CompletableFuture<Void>
4221    updateRSGroupConfig(String groupName, Map<String, String> configuration) {
4222    UpdateRSGroupConfigRequest.Builder request = UpdateRSGroupConfigRequest.newBuilder()
4223        .setGroupName(groupName);
4224    if (configuration != null) {
4225      configuration.entrySet().forEach(e ->
4226          request.addConfiguration(NameStringPair.newBuilder().setName(e.getKey())
4227              .setValue(e.getValue()).build()));
4228    }
4229    return this.<Void> newMasterCaller()
4230        .action(((controller, stub) ->
4231            this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
4232                controller, stub, request.build(),
4233              (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4234        ).call();
4235  }
4236
4237  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4238    return this.<List<LogEntry>>newMasterCaller()
4239      .action((controller, stub) ->
4240        this.call(controller, stub,
4241          ProtobufUtil.toBalancerDecisionRequest(limit),
4242          MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse))
4243      .call();
4244  }
4245
4246  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4247    return this.<List<LogEntry>>newMasterCaller()
4248      .action((controller, stub) ->
4249        this.call(controller, stub,
4250          ProtobufUtil.toBalancerRejectionRequest(limit),
4251          MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerRejectionResponse))
4252      .call();
4253  }
4254
4255  @Override
4256  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4257      String logType, ServerType serverType, int limit,
4258      Map<String, Object> filterParams) {
4259    if (logType == null || serverType == null) {
4260      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4261    }
4262    switch (logType){
4263      case "SLOW_LOG":
4264      case "LARGE_LOG":
4265        if (ServerType.MASTER.equals(serverType)) {
4266          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4267        }
4268        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4269      case "BALANCER_DECISION":
4270        if (ServerType.REGION_SERVER.equals(serverType)) {
4271          throw new IllegalArgumentException(
4272            "Balancer Decision logs are not maintained by HRegionServer");
4273        }
4274        return getBalancerDecisions(limit);
4275      case "BALANCER_REJECTION":
4276        if (ServerType.REGION_SERVER.equals(serverType)) {
4277          throw new IllegalArgumentException(
4278            "Balancer Rejection logs are not maintained by HRegionServer");
4279        }
4280        return getBalancerRejections(limit);
4281      default:
4282        return CompletableFuture.completedFuture(Collections.emptyList());
4283    }
4284  }
4285
4286}