001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.quotas.QuotaFilter;
080import org.apache.hadoop.hbase.quotas.QuotaSettings;
081import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
083import org.apache.hadoop.hbase.replication.ReplicationException;
084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
086import org.apache.hadoop.hbase.replication.SyncReplicationState;
087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
104import org.apache.hbase.thirdparty.com.google.protobuf.Message;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
108import org.apache.hbase.thirdparty.io.netty.util.Timeout;
109import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
111
112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
332
333/**
334 * The implementation of AsyncAdmin.
335 * <p>
336 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
337 * be finished inside the rpc framework thread, which means that the callbacks registered to the
338 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
339 * this class should not try to do time consuming tasks in the callbacks.
340 * @since 2.0.0
341 * @see AsyncHBaseAdmin
342 * @see AsyncConnection#getAdmin()
343 * @see AsyncConnection#getAdminBuilder()
344 */
345@InterfaceAudience.Private
346class RawAsyncHBaseAdmin implements AsyncAdmin {
347
348  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
349
350  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
351
352  private final AsyncConnectionImpl connection;
353
354  private final HashedWheelTimer retryTimer;
355
356  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
357
358  private final long rpcTimeoutNs;
359
360  private final long operationTimeoutNs;
361
362  private final long pauseNs;
363
364  private final long pauseForCQTBENs;
365
366  private final int maxAttempts;
367
368  private final int startLogErrorsCnt;
369
370  private final NonceGenerator ng;
371
372  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
373      AsyncAdminBuilderBase builder) {
374    this.connection = connection;
375    this.retryTimer = retryTimer;
376    this.metaTable = connection.getTable(META_TABLE_NAME);
377    this.rpcTimeoutNs = builder.rpcTimeoutNs;
378    this.operationTimeoutNs = builder.operationTimeoutNs;
379    this.pauseNs = builder.pauseNs;
380    if (builder.pauseForCQTBENs < builder.pauseNs) {
381      LOG.warn(
382        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
383          " the normal pause value {} ms, use the greater one instead",
384        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
385        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
386      this.pauseForCQTBENs = builder.pauseNs;
387    } else {
388      this.pauseForCQTBENs = builder.pauseForCQTBENs;
389    }
390    this.maxAttempts = builder.maxAttempts;
391    this.startLogErrorsCnt = builder.startLogErrorsCnt;
392    this.ng = connection.getNonceGenerator();
393  }
394
395  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
396    return this.connection.callerFactory.<T> masterRequest()
397      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
398      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
399      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
400      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
401  }
402
403  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
404    return this.connection.callerFactory.<T> adminRequest()
405      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
406      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
407      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
408      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
409  }
410
411  @FunctionalInterface
412  private interface MasterRpcCall<RESP, REQ> {
413    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
414        RpcCallback<RESP> done);
415  }
416
417  @FunctionalInterface
418  private interface AdminRpcCall<RESP, REQ> {
419    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
420        RpcCallback<RESP> done);
421  }
422
423  @FunctionalInterface
424  private interface Converter<D, S> {
425    D convert(S src) throws IOException;
426  }
427
428  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
429      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
430      Converter<RESP, PRESP> respConverter) {
431    CompletableFuture<RESP> future = new CompletableFuture<>();
432    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
433
434      @Override
435      public void run(PRESP resp) {
436        if (controller.failed()) {
437          future.completeExceptionally(controller.getFailed());
438        } else {
439          try {
440            future.complete(respConverter.convert(resp));
441          } catch (IOException e) {
442            future.completeExceptionally(e);
443          }
444        }
445      }
446    });
447    return future;
448  }
449
450  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
451      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
452      Converter<RESP, PRESP> respConverter) {
453    CompletableFuture<RESP> future = new CompletableFuture<>();
454    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
455
456      @Override
457      public void run(PRESP resp) {
458        if (controller.failed()) {
459          future.completeExceptionally(controller.getFailed());
460        } else {
461          try {
462            future.complete(respConverter.convert(resp));
463          } catch (IOException e) {
464            future.completeExceptionally(e);
465          }
466        }
467      }
468    });
469    return future;
470  }
471
472  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
473      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
474      ProcedureBiConsumer consumer) {
475    return procedureCall(b -> {
476    }, preq, rpcCall, respConverter, consumer);
477  }
478
479  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
480      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
481      ProcedureBiConsumer consumer) {
482    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
483  }
484
485  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
486      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
487      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
488      ProcedureBiConsumer consumer) {
489    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
490        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
491    prioritySetter.accept(builder);
492    CompletableFuture<Long> procFuture = builder.call();
493    CompletableFuture<Void> future = waitProcedureResult(procFuture);
494    addListener(future, consumer);
495    return future;
496  }
497
498  @FunctionalInterface
499  private interface TableOperator {
500    CompletableFuture<Void> operate(TableName table);
501  }
502
503  @Override
504  public CompletableFuture<Boolean> tableExists(TableName tableName) {
505    if (TableName.isMetaTableName(tableName)) {
506      return CompletableFuture.completedFuture(true);
507    }
508    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
509  }
510
511  @Override
512  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
513    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
514      includeSysTables));
515  }
516
517  /**
518   * {@link #listTableDescriptors(boolean)}
519   */
520  @Override
521  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
522      boolean includeSysTables) {
523    Preconditions.checkNotNull(pattern,
524      "pattern is null. If you don't specify a pattern, "
525          + "use listTableDescriptors(boolean) instead");
526    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
527      includeSysTables));
528  }
529
530  @Override
531  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
532    Preconditions.checkNotNull(tableNames,
533      "tableNames is null. If you don't specify tableNames, "
534          + "use listTableDescriptors(boolean) instead");
535    if (tableNames.isEmpty()) {
536      return CompletableFuture.completedFuture(Collections.emptyList());
537    }
538    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
539  }
540
541  private CompletableFuture<List<TableDescriptor>>
542      getTableDescriptors(GetTableDescriptorsRequest request) {
543    return this.<List<TableDescriptor>> newMasterCaller()
544        .action((controller, stub) -> this
545            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
546              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
547              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
548        .call();
549  }
550
551  @Override
552  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
553    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
554  }
555
556  @Override
557  public CompletableFuture<List<TableName>>
558      listTableNames(Pattern pattern, boolean includeSysTables) {
559    Preconditions.checkNotNull(pattern,
560        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
561    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
562  }
563
564  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
565    return this
566        .<List<TableName>> newMasterCaller()
567        .action(
568          (controller, stub) -> this
569              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
570                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
571                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
572  }
573
574  @Override
575  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
576    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
577        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
578        List<TableDescriptor>> call(
579          controller, stub,
580          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
581          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
582          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
583        .call();
584  }
585
586  @Override
587  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
588    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
589        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
590        List<TableName>> call(
591          controller, stub,
592          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
593          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
594          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
595        .call();
596  }
597
598  @Override
599  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
600    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
601    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
602      .action((controller, stub) -> this
603        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
604          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
605          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
606          (resp) -> resp.getTableSchemaList()))
607      .call(), (tableSchemas, error) -> {
608        if (error != null) {
609          future.completeExceptionally(error);
610          return;
611        }
612        if (!tableSchemas.isEmpty()) {
613          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
614        } else {
615          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
616        }
617      });
618    return future;
619  }
620
621  @Override
622  public CompletableFuture<Void> createTable(TableDescriptor desc) {
623    return createTable(desc.getTableName(),
624      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
625  }
626
627  @Override
628  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
629      int numRegions) {
630    try {
631      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
632    } catch (IllegalArgumentException e) {
633      return failedFuture(e);
634    }
635  }
636
637  @Override
638  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
639    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
640        + " use createTable(TableDescriptor) instead");
641    try {
642      verifySplitKeys(splitKeys);
643      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
644        splitKeys, ng.getNonceGroup(), ng.newNonce()));
645    } catch (IllegalArgumentException e) {
646      return failedFuture(e);
647    }
648  }
649
650  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
651    Preconditions.checkNotNull(tableName, "table name is null");
652    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
653      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
654      new CreateTableProcedureBiConsumer(tableName));
655  }
656
657  @Override
658  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
659    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
660      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
661        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
662      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
663  }
664
665  @Override
666  public CompletableFuture<Void> deleteTable(TableName tableName) {
667    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
668      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
669      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
670      new DeleteTableProcedureBiConsumer(tableName));
671  }
672
673  @Override
674  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
675    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
676      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
677        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
678      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
679  }
680
681  @Override
682  public CompletableFuture<Void> enableTable(TableName tableName) {
683    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
684      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
685      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
686      new EnableTableProcedureBiConsumer(tableName));
687  }
688
689  @Override
690  public CompletableFuture<Void> disableTable(TableName tableName) {
691    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
692      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
693      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
694      new DisableTableProcedureBiConsumer(tableName));
695  }
696
697  /**
698   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
699   * using passed parameters. Sets error or boolean result ('true' if table matches
700   * the passed-in targetState).
701   */
702  private static CompletableFuture<Boolean> completeCheckTableState(
703      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
704      TableState.State targetState, TableName tableName) {
705    if (error != null) {
706      future.completeExceptionally(error);
707    } else {
708      if (tableState != null) {
709        future.complete(tableState.inStates(targetState));
710      } else {
711        future.completeExceptionally(new TableNotFoundException(tableName));
712      }
713    }
714    return future;
715  }
716
717  @Override
718  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
719    if (TableName.isMetaTableName(tableName)) {
720      return CompletableFuture.completedFuture(true);
721    }
722    CompletableFuture<Boolean> future = new CompletableFuture<>();
723    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
724      (tableState, error) -> {
725        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
726          TableState.State.ENABLED, tableName);
727      });
728    return future;
729  }
730
731  @Override
732  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
733    if (TableName.isMetaTableName(tableName)) {
734      return CompletableFuture.completedFuture(false);
735    }
736    CompletableFuture<Boolean> future = new CompletableFuture<>();
737    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
738      (tableState, error) -> {
739        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
740          TableState.State.DISABLED, tableName);
741      });
742    return future;
743  }
744
745  @Override
746  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
747    if (TableName.isMetaTableName(tableName)) {
748      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
749        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
750    }
751    CompletableFuture<Boolean> future = new CompletableFuture<>();
752    addListener(isTableEnabled(tableName), (enabled, error) -> {
753      if (error != null) {
754        if (error instanceof TableNotFoundException) {
755          future.complete(false);
756        } else {
757          future.completeExceptionally(error);
758        }
759        return;
760      }
761      if (!enabled) {
762        future.complete(false);
763      } else {
764        addListener(
765          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
766          (locations, error1) -> {
767            if (error1 != null) {
768              future.completeExceptionally(error1);
769              return;
770            }
771            List<HRegionLocation> notDeployedRegions = locations.stream()
772              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
773            if (notDeployedRegions.size() > 0) {
774              if (LOG.isDebugEnabled()) {
775                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
776              }
777              future.complete(false);
778              return;
779            }
780            future.complete(true);
781          });
782      }
783    });
784    return future;
785  }
786
787  @Override
788  public CompletableFuture<Void> addColumnFamily(
789      TableName tableName, ColumnFamilyDescriptor columnFamily) {
790    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
791      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
792        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
793      new AddColumnFamilyProcedureBiConsumer(tableName));
794  }
795
796  @Override
797  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
798    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
799      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
800        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
801      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
802  }
803
804  @Override
805  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
806      ColumnFamilyDescriptor columnFamily) {
807    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
808      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
809        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
810      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
811  }
812
813  @Override
814  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
815    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
816      RequestConverter.buildCreateNamespaceRequest(descriptor),
817      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
818      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
819  }
820
821  @Override
822  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
823    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
824      RequestConverter.buildModifyNamespaceRequest(descriptor),
825      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
826      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
827  }
828
829  @Override
830  public CompletableFuture<Void> deleteNamespace(String name) {
831    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
832      RequestConverter.buildDeleteNamespaceRequest(name),
833      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
834      new DeleteNamespaceProcedureBiConsumer(name));
835  }
836
837  @Override
838  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
839    return this
840        .<NamespaceDescriptor> newMasterCaller()
841        .action(
842          (controller, stub) -> this
843              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
844                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
845                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
846                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
847  }
848
849  @Override
850  public CompletableFuture<List<String>> listNamespaces() {
851    return this
852        .<List<String>> newMasterCaller()
853        .action(
854          (controller, stub) -> this
855              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
856                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
857                  done) -> s.listNamespaces(c, req, done),
858                (resp) -> resp.getNamespaceNameList())).call();
859  }
860
861  @Override
862  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
863    return this
864        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
865              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
866                  List<NamespaceDescriptor>> call(controller, stub,
867                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
868                      s.listNamespaceDescriptors(c, req, done),
869                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
870  }
871
872  @Override
873  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
874    return this.<List<RegionInfo>> newAdminCaller()
875        .action((controller, stub) -> this
876            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
877              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
878              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
879              resp -> ProtobufUtil.getRegionInfos(resp)))
880        .serverName(serverName).call();
881  }
882
883  @Override
884  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
885    if (tableName.equals(META_TABLE_NAME)) {
886      return connection.registry.getMetaRegionLocations()
887        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
888          .collect(Collectors.toList()));
889    } else {
890      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
891        .thenApply(
892          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
893    }
894  }
895  @Override
896  public CompletableFuture<Void> flush(TableName tableName) {
897    return flush(tableName, null);
898  }
899
900  @Override
901  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
902    CompletableFuture<Void> future = new CompletableFuture<>();
903    addListener(tableExists(tableName), (exists, err) -> {
904      if (err != null) {
905        future.completeExceptionally(err);
906      } else if (!exists) {
907        future.completeExceptionally(new TableNotFoundException(tableName));
908      } else {
909        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
910          if (err2 != null) {
911            future.completeExceptionally(err2);
912          } else if (!tableEnabled) {
913            future.completeExceptionally(new TableNotEnabledException(tableName));
914          } else {
915            Map<String, String> props = new HashMap<>();
916            if (columnFamily != null) {
917              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
918            }
919            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
920              props), (ret, err3) -> {
921                if (err3 != null) {
922                  future.completeExceptionally(err3);
923                } else {
924                  future.complete(ret);
925                }
926              });
927          }
928        });
929      }
930    });
931    return future;
932  }
933
934  @Override
935  public CompletableFuture<Void> flushRegion(byte[] regionName) {
936    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
937    });
938  }
939
940  @Override
941  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
942    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
943      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
944    return flushRegionInternal(regionName, columnFamily, false)
945      .thenAccept(r -> {});
946  }
947
948  /**
949   * This method is for internal use only, where we need the response of the flush.
950   * <p/>
951   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
952   * API.
953   */
954  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
955    byte[] columnFamily, boolean writeFlushWALMarker) {
956    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
957    addListener(getRegionLocation(regionName), (location, err) -> {
958      if (err != null) {
959        future.completeExceptionally(err);
960        return;
961      }
962      ServerName serverName = location.getServerName();
963      if (serverName == null) {
964        future
965          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
966        return;
967      }
968      addListener(
969        flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
970        (ret, err2) -> {
971          if (err2 != null) {
972            future.completeExceptionally(err2);
973          } else {
974            future.complete(ret);
975          }});
976    });
977    return future;
978  }
979
980  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
981    byte[] columnFamily, boolean writeFlushWALMarker) {
982    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
983      .action((controller, stub) -> this
984        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
985          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(),
986            columnFamily, writeFlushWALMarker),
987          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
988      .call();
989  }
990
991  @Override
992  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
993    CompletableFuture<Void> future = new CompletableFuture<>();
994    addListener(getRegions(sn), (hRegionInfos, err) -> {
995      if (err != null) {
996        future.completeExceptionally(err);
997        return;
998      }
999      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1000      if (hRegionInfos != null) {
1001        hRegionInfos.forEach(
1002          region -> compactFutures.add(
1003            flush(sn, region, null, false).thenAccept(r -> {})
1004          )
1005        );
1006      }
1007      addListener(CompletableFuture.allOf(
1008        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1009          if (err2 != null) {
1010            future.completeExceptionally(err2);
1011          } else {
1012            future.complete(ret);
1013          }
1014        });
1015    });
1016    return future;
1017  }
1018
1019  @Override
1020  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1021    return compact(tableName, null, false, compactType);
1022  }
1023
1024  @Override
1025  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1026      CompactType compactType) {
1027    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1028        + "If you don't specify a columnFamily, use compact(TableName) instead");
1029    return compact(tableName, columnFamily, false, compactType);
1030  }
1031
1032  @Override
1033  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1034    return compactRegion(regionName, null, false);
1035  }
1036
1037  @Override
1038  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1039    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1040        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1041    return compactRegion(regionName, columnFamily, false);
1042  }
1043
1044  @Override
1045  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1046    return compact(tableName, null, true, compactType);
1047  }
1048
1049  @Override
1050  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1051      CompactType compactType) {
1052    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1053        + "If you don't specify a columnFamily, use compact(TableName) instead");
1054    return compact(tableName, columnFamily, true, compactType);
1055  }
1056
1057  @Override
1058  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1059    return compactRegion(regionName, null, true);
1060  }
1061
1062  @Override
1063  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1064    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1065        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1066    return compactRegion(regionName, columnFamily, true);
1067  }
1068
1069  @Override
1070  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1071    return compactRegionServer(sn, false);
1072  }
1073
1074  @Override
1075  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1076    return compactRegionServer(sn, true);
1077  }
1078
1079  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1080    CompletableFuture<Void> future = new CompletableFuture<>();
1081    addListener(getRegions(sn), (hRegionInfos, err) -> {
1082      if (err != null) {
1083        future.completeExceptionally(err);
1084        return;
1085      }
1086      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1087      if (hRegionInfos != null) {
1088        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1089      }
1090      addListener(CompletableFuture.allOf(
1091        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1092          if (err2 != null) {
1093            future.completeExceptionally(err2);
1094          } else {
1095            future.complete(ret);
1096          }
1097        });
1098    });
1099    return future;
1100  }
1101
1102  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1103      boolean major) {
1104    CompletableFuture<Void> future = new CompletableFuture<>();
1105    addListener(getRegionLocation(regionName), (location, err) -> {
1106      if (err != null) {
1107        future.completeExceptionally(err);
1108        return;
1109      }
1110      ServerName serverName = location.getServerName();
1111      if (serverName == null) {
1112        future
1113          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1114        return;
1115      }
1116      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1117        (ret, err2) -> {
1118          if (err2 != null) {
1119            future.completeExceptionally(err2);
1120          } else {
1121            future.complete(ret);
1122          }
1123        });
1124    });
1125    return future;
1126  }
1127
1128  /**
1129   * List all region locations for the specific table.
1130   */
1131  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1132    if (TableName.META_TABLE_NAME.equals(tableName)) {
1133      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1134      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1135        if (err != null) {
1136          future.completeExceptionally(err);
1137        } else if (metaRegions == null || metaRegions.isEmpty() ||
1138          metaRegions.getDefaultRegionLocation() == null) {
1139          future.completeExceptionally(new IOException("meta region does not found"));
1140        } else {
1141          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1142        }
1143      });
1144      return future;
1145    } else {
1146      // For non-meta table, we fetch all locations by scanning hbase:meta table
1147      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1148    }
1149  }
1150
1151  /**
1152   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1153   */
1154  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1155      CompactType compactType) {
1156    CompletableFuture<Void> future = new CompletableFuture<>();
1157
1158    switch (compactType) {
1159      case MOB:
1160        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1161          if (err != null) {
1162            future.completeExceptionally(err);
1163            return;
1164          }
1165          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1166          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1167            if (err2 != null) {
1168              future.completeExceptionally(err2);
1169            } else {
1170              future.complete(ret);
1171            }
1172          });
1173        });
1174        break;
1175      case NORMAL:
1176        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1177          if (err != null) {
1178            future.completeExceptionally(err);
1179            return;
1180          }
1181          if (locations == null || locations.isEmpty()) {
1182            future.completeExceptionally(new TableNotFoundException(tableName));
1183          }
1184          CompletableFuture<?>[] compactFutures =
1185            locations.stream().filter(l -> l.getRegion() != null)
1186              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1187              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1188              .toArray(CompletableFuture<?>[]::new);
1189          // future complete unless all of the compact futures are completed.
1190          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1191            if (err2 != null) {
1192              future.completeExceptionally(err2);
1193            } else {
1194              future.complete(ret);
1195            }
1196          });
1197        });
1198        break;
1199      default:
1200        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1201    }
1202    return future;
1203  }
1204
1205  /**
1206   * Compact the region at specific region server.
1207   */
1208  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1209      final boolean major, byte[] columnFamily) {
1210    return this
1211        .<Void> newAdminCaller()
1212        .serverName(sn)
1213        .action(
1214          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1215            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1216              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1217            resp -> null)).call();
1218  }
1219
1220  private byte[] toEncodeRegionName(byte[] regionName) {
1221    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1222      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1223  }
1224
1225  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1226      CompletableFuture<TableName> result) {
1227    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1228      if (err != null) {
1229        result.completeExceptionally(err);
1230        return;
1231      }
1232      RegionInfo regionInfo = location.getRegion();
1233      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1234        result.completeExceptionally(
1235          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1236        return;
1237      }
1238      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1239        if (!tableName.get().equals(regionInfo.getTable())) {
1240          // tables of this two region should be same.
1241          result.completeExceptionally(
1242            new IllegalArgumentException("Cannot merge regions from two different tables " +
1243              tableName.get() + " and " + regionInfo.getTable()));
1244        } else {
1245          result.complete(tableName.get());
1246        }
1247      }
1248    });
1249  }
1250
1251  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1252    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1253    CompletableFuture<TableName> future = new CompletableFuture<>();
1254    for (byte[] encodedRegionName : encodedRegionNames) {
1255      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1256    }
1257    return future;
1258  }
1259
1260  @Override
1261  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1262    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1263  }
1264
1265  @Override
1266  public CompletableFuture<Boolean> isMergeEnabled() {
1267    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1268  }
1269
1270  @Override
1271  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1272    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1273  }
1274
1275  @Override
1276  public CompletableFuture<Boolean> isSplitEnabled() {
1277    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1278  }
1279
1280  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1281      MasterSwitchType switchType) {
1282    SetSplitOrMergeEnabledRequest request =
1283      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1284    return this.<Boolean> newMasterCaller()
1285      .action((controller, stub) -> this
1286        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1287          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1288          (resp) -> resp.getPrevValueList().get(0)))
1289      .call();
1290  }
1291
1292  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1293    IsSplitOrMergeEnabledRequest request =
1294        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1295    return this
1296        .<Boolean> newMasterCaller()
1297        .action(
1298          (controller, stub) -> this
1299              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1300                controller, stub, request,
1301                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1302                (resp) -> resp.getEnabled())).call();
1303  }
1304
1305  @Override
1306  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1307    if (nameOfRegionsToMerge.size() < 2) {
1308      return failedFuture(new IllegalArgumentException(
1309        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1310    }
1311    CompletableFuture<Void> future = new CompletableFuture<>();
1312    byte[][] encodedNameOfRegionsToMerge =
1313      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1314
1315    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1316      if (err != null) {
1317        future.completeExceptionally(err);
1318        return;
1319      }
1320
1321      final MergeTableRegionsRequest request;
1322      try {
1323        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1324          forcible, ng.getNonceGroup(), ng.newNonce());
1325      } catch (DeserializationException e) {
1326        future.completeExceptionally(e);
1327        return;
1328      }
1329
1330      addListener(
1331        this.procedureCall(tableName, request,
1332          MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1333          new MergeTableRegionProcedureBiConsumer(tableName)),
1334        (ret, err2) -> {
1335          if (err2 != null) {
1336            future.completeExceptionally(err2);
1337          } else {
1338            future.complete(ret);
1339          }
1340        });
1341    });
1342    return future;
1343  }
1344
1345  @Override
1346  public CompletableFuture<Void> split(TableName tableName) {
1347    CompletableFuture<Void> future = new CompletableFuture<>();
1348    addListener(tableExists(tableName), (exist, error) -> {
1349      if (error != null) {
1350        future.completeExceptionally(error);
1351        return;
1352      }
1353      if (!exist) {
1354        future.completeExceptionally(new TableNotFoundException(tableName));
1355        return;
1356      }
1357      addListener(metaTable
1358        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1359          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1360            ClientMetaTableAccessor.QueryType.REGION))
1361          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1362            ClientMetaTableAccessor.QueryType.REGION))),
1363        (results, err2) -> {
1364          if (err2 != null) {
1365            future.completeExceptionally(err2);
1366            return;
1367          }
1368          if (results != null && !results.isEmpty()) {
1369            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1370            for (Result r : results) {
1371              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1372                continue;
1373              }
1374              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1375              if (rl != null) {
1376                for (HRegionLocation h : rl.getRegionLocations()) {
1377                  if (h != null && h.getServerName() != null) {
1378                    RegionInfo hri = h.getRegion();
1379                    if (hri == null || hri.isSplitParent() ||
1380                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1381                      continue;
1382                    }
1383                    splitFutures.add(split(hri, null));
1384                  }
1385                }
1386              }
1387            }
1388            addListener(
1389              CompletableFuture
1390                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1391              (ret, exception) -> {
1392                if (exception != null) {
1393                  future.completeExceptionally(exception);
1394                  return;
1395                }
1396                future.complete(ret);
1397              });
1398          } else {
1399            future.complete(null);
1400          }
1401        });
1402    });
1403    return future;
1404  }
1405
1406  @Override
1407  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1408    CompletableFuture<Void> result = new CompletableFuture<>();
1409    if (splitPoint == null) {
1410      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1411    }
1412    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1413      (loc, err) -> {
1414        if (err != null) {
1415          result.completeExceptionally(err);
1416        } else if (loc == null || loc.getRegion() == null) {
1417          result.completeExceptionally(new IllegalArgumentException(
1418            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1419        } else {
1420          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1421            if (err2 != null) {
1422              result.completeExceptionally(err2);
1423            } else {
1424              result.complete(ret);
1425            }
1426
1427          });
1428        }
1429      });
1430    return result;
1431  }
1432
1433  @Override
1434  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1435    CompletableFuture<Void> future = new CompletableFuture<>();
1436    addListener(getRegionLocation(regionName), (location, err) -> {
1437      if (err != null) {
1438        future.completeExceptionally(err);
1439        return;
1440      }
1441      RegionInfo regionInfo = location.getRegion();
1442      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1443        future
1444          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1445            "Replicas are auto-split when their primary is split."));
1446        return;
1447      }
1448      ServerName serverName = location.getServerName();
1449      if (serverName == null) {
1450        future
1451          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1452        return;
1453      }
1454      addListener(split(regionInfo, null), (ret, err2) -> {
1455        if (err2 != null) {
1456          future.completeExceptionally(err2);
1457        } else {
1458          future.complete(ret);
1459        }
1460      });
1461    });
1462    return future;
1463  }
1464
1465  @Override
1466  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1467    Preconditions.checkNotNull(splitPoint,
1468      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1469    CompletableFuture<Void> future = new CompletableFuture<>();
1470    addListener(getRegionLocation(regionName), (location, err) -> {
1471      if (err != null) {
1472        future.completeExceptionally(err);
1473        return;
1474      }
1475      RegionInfo regionInfo = location.getRegion();
1476      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1477        future
1478          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1479            "Replicas are auto-split when their primary is split."));
1480        return;
1481      }
1482      ServerName serverName = location.getServerName();
1483      if (serverName == null) {
1484        future
1485          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1486        return;
1487      }
1488      if (regionInfo.getStartKey() != null &&
1489        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1490        future.completeExceptionally(
1491          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1492        return;
1493      }
1494      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1495        if (err2 != null) {
1496          future.completeExceptionally(err2);
1497        } else {
1498          future.complete(ret);
1499        }
1500      });
1501    });
1502    return future;
1503  }
1504
1505  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1506    CompletableFuture<Void> future = new CompletableFuture<>();
1507    TableName tableName = hri.getTable();
1508    final SplitTableRegionRequest request;
1509    try {
1510      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1511        ng.newNonce());
1512    } catch (DeserializationException e) {
1513      future.completeExceptionally(e);
1514      return future;
1515    }
1516
1517    addListener(
1518      this.procedureCall(tableName,
1519        request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1520        new SplitTableRegionProcedureBiConsumer(tableName)),
1521      (ret, err2) -> {
1522        if (err2 != null) {
1523          future.completeExceptionally(err2);
1524        } else {
1525          future.complete(ret);
1526        }
1527      });
1528    return future;
1529  }
1530
1531  @Override
1532  public CompletableFuture<Void> assign(byte[] regionName) {
1533    CompletableFuture<Void> future = new CompletableFuture<>();
1534    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1535      if (err != null) {
1536        future.completeExceptionally(err);
1537        return;
1538      }
1539      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1540        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1541          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1542          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1543        .call(), (ret, err2) -> {
1544          if (err2 != null) {
1545            future.completeExceptionally(err2);
1546          } else {
1547            future.complete(ret);
1548          }
1549        });
1550    });
1551    return future;
1552  }
1553
1554  @Override
1555  public CompletableFuture<Void> unassign(byte[] regionName) {
1556    CompletableFuture<Void> future = new CompletableFuture<>();
1557    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1558      if (err != null) {
1559        future.completeExceptionally(err);
1560        return;
1561      }
1562      addListener(
1563        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1564          .action(((controller, stub) -> this
1565            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1566              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1567              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1568          .call(),
1569        (ret, err2) -> {
1570          if (err2 != null) {
1571            future.completeExceptionally(err2);
1572          } else {
1573            future.complete(ret);
1574          }
1575        });
1576    });
1577    return future;
1578  }
1579
1580  @Override
1581  public CompletableFuture<Void> offline(byte[] regionName) {
1582    CompletableFuture<Void> future = new CompletableFuture<>();
1583    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1584      if (err != null) {
1585        future.completeExceptionally(err);
1586        return;
1587      }
1588      addListener(
1589        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1590          .action(((controller, stub) -> this
1591            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1592              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1593              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1594          .call(),
1595        (ret, err2) -> {
1596          if (err2 != null) {
1597            future.completeExceptionally(err2);
1598          } else {
1599            future.complete(ret);
1600          }
1601        });
1602    });
1603    return future;
1604  }
1605
1606  @Override
1607  public CompletableFuture<Void> move(byte[] regionName) {
1608    CompletableFuture<Void> future = new CompletableFuture<>();
1609    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1610      if (err != null) {
1611        future.completeExceptionally(err);
1612        return;
1613      }
1614      addListener(
1615        moveRegion(regionInfo,
1616          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1617        (ret, err2) -> {
1618          if (err2 != null) {
1619            future.completeExceptionally(err2);
1620          } else {
1621            future.complete(ret);
1622          }
1623        });
1624    });
1625    return future;
1626  }
1627
1628  @Override
1629  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1630    Preconditions.checkNotNull(destServerName,
1631      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1632    CompletableFuture<Void> future = new CompletableFuture<>();
1633    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1634      if (err != null) {
1635        future.completeExceptionally(err);
1636        return;
1637      }
1638      addListener(
1639        moveRegion(regionInfo, RequestConverter
1640          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1641        (ret, err2) -> {
1642          if (err2 != null) {
1643            future.completeExceptionally(err2);
1644          } else {
1645            future.complete(ret);
1646          }
1647        });
1648    });
1649    return future;
1650  }
1651
1652  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1653    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1654      .action(
1655        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1656          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1657      .call();
1658  }
1659
1660  @Override
1661  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1662    return this
1663        .<Void> newMasterCaller()
1664        .action(
1665          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1666            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1667            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1668  }
1669
1670  @Override
1671  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1672    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1673    Scan scan = QuotaTableUtil.makeScan(filter);
1674    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1675        .scan(scan, new AdvancedScanResultConsumer() {
1676          List<QuotaSettings> settings = new ArrayList<>();
1677
1678          @Override
1679          public void onNext(Result[] results, ScanController controller) {
1680            for (Result result : results) {
1681              try {
1682                QuotaTableUtil.parseResultToCollection(result, settings);
1683              } catch (IOException e) {
1684                controller.terminate();
1685                future.completeExceptionally(e);
1686              }
1687            }
1688          }
1689
1690          @Override
1691          public void onError(Throwable error) {
1692            future.completeExceptionally(error);
1693          }
1694
1695          @Override
1696          public void onComplete() {
1697            future.complete(settings);
1698          }
1699        });
1700    return future;
1701  }
1702
1703  @Override
1704  public CompletableFuture<Void> addReplicationPeer(String peerId,
1705      ReplicationPeerConfig peerConfig, boolean enabled) {
1706    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1707      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1708      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1709      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1710  }
1711
1712  @Override
1713  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1714    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1715      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1716      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1717      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1718  }
1719
1720  @Override
1721  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1722    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1723      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1724      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1725      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1726  }
1727
1728  @Override
1729  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1730    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1731      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1732      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1733      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1734  }
1735
1736  @Override
1737  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1738    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1739      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
1740          call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1741            (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1742            (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1743  }
1744
1745  @Override
1746  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1747      ReplicationPeerConfig peerConfig) {
1748    return this
1749      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1750        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1751        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1752        (resp) -> resp.getProcId(),
1753        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1754  }
1755
1756  @Override
1757  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1758      SyncReplicationState clusterState) {
1759    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1760        TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1761        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1762          clusterState),
1763          (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1764          (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1765            () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1766  }
1767
1768  @Override
1769  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1770      Map<TableName, List<String>> tableCfs) {
1771    if (tableCfs == null) {
1772      return failedFuture(new ReplicationException("tableCfs is null"));
1773    }
1774
1775    CompletableFuture<Void> future = new CompletableFuture<Void>();
1776    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1777      if (!completeExceptionally(future, error)) {
1778        ReplicationPeerConfig newPeerConfig =
1779          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1780        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1781          if (!completeExceptionally(future, error)) {
1782            future.complete(result);
1783          }
1784        });
1785      }
1786    });
1787    return future;
1788  }
1789
1790  @Override
1791  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1792      Map<TableName, List<String>> tableCfs) {
1793    if (tableCfs == null) {
1794      return failedFuture(new ReplicationException("tableCfs is null"));
1795    }
1796
1797    CompletableFuture<Void> future = new CompletableFuture<Void>();
1798    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1799      if (!completeExceptionally(future, error)) {
1800        ReplicationPeerConfig newPeerConfig = null;
1801        try {
1802          newPeerConfig = ReplicationPeerConfigUtil
1803            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1804        } catch (ReplicationException e) {
1805          future.completeExceptionally(e);
1806          return;
1807        }
1808        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1809          if (!completeExceptionally(future, error)) {
1810            future.complete(result);
1811          }
1812        });
1813      }
1814    });
1815    return future;
1816  }
1817
1818  @Override
1819  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1820    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1821  }
1822
1823  @Override
1824  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1825    Preconditions.checkNotNull(pattern,
1826      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1827    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1828  }
1829
1830  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1831      ListReplicationPeersRequest request) {
1832    return this
1833        .<List<ReplicationPeerDescription>> newMasterCaller()
1834        .action(
1835          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1836              List<ReplicationPeerDescription>> call(controller, stub, request,
1837                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1838                (resp) -> resp.getPeerDescList().stream()
1839                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1840                    .collect(Collectors.toList()))).call();
1841  }
1842
1843  @Override
1844  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1845    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1846    addListener(listTableDescriptors(), (tables, error) -> {
1847      if (!completeExceptionally(future, error)) {
1848        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1849        tables.forEach(table -> {
1850          Map<String, Integer> cfs = new HashMap<>();
1851          Stream.of(table.getColumnFamilies())
1852            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1853            .forEach(column -> {
1854              cfs.put(column.getNameAsString(), column.getScope());
1855            });
1856          if (!cfs.isEmpty()) {
1857            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1858          }
1859        });
1860        future.complete(replicatedTableCFs);
1861      }
1862    });
1863    return future;
1864  }
1865
1866  @Override
1867  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1868    SnapshotProtos.SnapshotDescription snapshot =
1869      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1870    try {
1871      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1872    } catch (IllegalArgumentException e) {
1873      return failedFuture(e);
1874    }
1875    CompletableFuture<Void> future = new CompletableFuture<>();
1876    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1877    addListener(this.<Long> newMasterCaller()
1878      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1879        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1880        resp -> resp.getExpectedTimeout()))
1881      .call(), (expectedTimeout, err) -> {
1882        if (err != null) {
1883          future.completeExceptionally(err);
1884          return;
1885        }
1886        TimerTask pollingTask = new TimerTask() {
1887          int tries = 0;
1888          long startTime = EnvironmentEdgeManager.currentTime();
1889          long endTime = startTime + expectedTimeout;
1890          long maxPauseTime = expectedTimeout / maxAttempts;
1891
1892          @Override
1893          public void run(Timeout timeout) throws Exception {
1894            if (EnvironmentEdgeManager.currentTime() < endTime) {
1895              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1896                if (err2 != null) {
1897                  future.completeExceptionally(err2);
1898                } else if (done) {
1899                  future.complete(null);
1900                } else {
1901                  // retry again after pauseTime.
1902                  long pauseTime =
1903                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1904                  pauseTime = Math.min(pauseTime, maxPauseTime);
1905                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1906                    TimeUnit.MILLISECONDS);
1907                }
1908              });
1909            } else {
1910              future.completeExceptionally(
1911                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1912                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1913            }
1914          }
1915        };
1916        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1917      });
1918    return future;
1919  }
1920
1921  @Override
1922  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1923    return this
1924        .<Boolean> newMasterCaller()
1925        .action(
1926          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1927            controller,
1928            stub,
1929            IsSnapshotDoneRequest.newBuilder()
1930                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1931                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1932  }
1933
1934  @Override
1935  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1936    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1937      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1938      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1939    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1940  }
1941
1942  @Override
1943  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1944      boolean restoreAcl) {
1945    CompletableFuture<Void> future = new CompletableFuture<>();
1946    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1947      if (err != null) {
1948        future.completeExceptionally(err);
1949        return;
1950      }
1951      TableName tableName = null;
1952      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1953        for (SnapshotDescription snap : snapshotDescriptions) {
1954          if (snap.getName().equals(snapshotName)) {
1955            tableName = snap.getTableName();
1956            break;
1957          }
1958        }
1959      }
1960      if (tableName == null) {
1961        future.completeExceptionally(new RestoreSnapshotException(
1962          "Unable to find the table name for snapshot=" + snapshotName));
1963        return;
1964      }
1965      final TableName finalTableName = tableName;
1966      addListener(tableExists(finalTableName), (exists, err2) -> {
1967        if (err2 != null) {
1968          future.completeExceptionally(err2);
1969        } else if (!exists) {
1970          // if table does not exist, then just clone snapshot into new table.
1971          completeConditionalOnFuture(future,
1972            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1973        } else {
1974          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1975            if (err4 != null) {
1976              future.completeExceptionally(err4);
1977            } else if (!disabled) {
1978              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1979            } else {
1980              completeConditionalOnFuture(future,
1981                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1982            }
1983          });
1984        }
1985      });
1986    });
1987    return future;
1988  }
1989
1990  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1991      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1992    if (takeFailSafeSnapshot) {
1993      CompletableFuture<Void> future = new CompletableFuture<>();
1994      // Step.1 Take a snapshot of the current state
1995      String failSafeSnapshotSnapshotNameFormat =
1996        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1997          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1998      final String failSafeSnapshotSnapshotName =
1999        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2000          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2001          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2002      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2003      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2004        if (err != null) {
2005          future.completeExceptionally(err);
2006        } else {
2007          // Step.2 Restore snapshot
2008          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
2009            (void2, err2) -> {
2010              if (err2 != null) {
2011                // Step.3.a Something went wrong during the restore and try to rollback.
2012                addListener(
2013                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
2014                  (void3, err3) -> {
2015                    if (err3 != null) {
2016                      future.completeExceptionally(err3);
2017                    } else {
2018                      String msg =
2019                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
2020                          failSafeSnapshotSnapshotName + " succeeded.";
2021                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2022                    }
2023                  });
2024              } else {
2025                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2026                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2027                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2028                  if (err3 != null) {
2029                    LOG.error(
2030                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2031                      err3);
2032                  }
2033                  future.complete(ret3);
2034                });
2035              }
2036            });
2037        }
2038      });
2039      return future;
2040    } else {
2041      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
2042    }
2043  }
2044
2045  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2046      CompletableFuture<T> parentFuture) {
2047    addListener(parentFuture, (res, err) -> {
2048      if (err != null) {
2049        dependentFuture.completeExceptionally(err);
2050      } else {
2051        dependentFuture.complete(res);
2052      }
2053    });
2054  }
2055
2056  @Override
2057  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2058      boolean restoreAcl) {
2059    CompletableFuture<Void> future = new CompletableFuture<>();
2060    addListener(tableExists(tableName), (exists, err) -> {
2061      if (err != null) {
2062        future.completeExceptionally(err);
2063      } else if (exists) {
2064        future.completeExceptionally(new TableExistsException(tableName));
2065      } else {
2066        completeConditionalOnFuture(future,
2067          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2068      }
2069    });
2070    return future;
2071  }
2072
2073  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2074      boolean restoreAcl) {
2075    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2076      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2077    try {
2078      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2079    } catch (IllegalArgumentException e) {
2080      return failedFuture(e);
2081    }
2082    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2083      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2084        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2085          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2086        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2087      .call());
2088  }
2089
2090  @Override
2091  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2092    return getCompletedSnapshots(null);
2093  }
2094
2095  @Override
2096  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2097    Preconditions.checkNotNull(pattern,
2098      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2099    return getCompletedSnapshots(pattern);
2100  }
2101
2102  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2103    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2104        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2105        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2106          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2107          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2108        .call();
2109  }
2110
2111  @Override
2112  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2113    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2114        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2115    return getCompletedSnapshots(tableNamePattern, null);
2116  }
2117
2118  @Override
2119  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2120      Pattern snapshotNamePattern) {
2121    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2122        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2123    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2124        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2125    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2126  }
2127
2128  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2129      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2130    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2131    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2132      if (err != null) {
2133        future.completeExceptionally(err);
2134        return;
2135      }
2136      if (tableNames == null || tableNames.size() <= 0) {
2137        future.complete(Collections.emptyList());
2138        return;
2139      }
2140      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2141        if (err2 != null) {
2142          future.completeExceptionally(err2);
2143          return;
2144        }
2145        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2146          future.complete(Collections.emptyList());
2147          return;
2148        }
2149        future.complete(snapshotDescList.stream()
2150          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2151          .collect(Collectors.toList()));
2152      });
2153    });
2154    return future;
2155  }
2156
2157  @Override
2158  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2159    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2160  }
2161
2162  @Override
2163  public CompletableFuture<Void> deleteSnapshots() {
2164    return internalDeleteSnapshots(null, null);
2165  }
2166
2167  @Override
2168  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2169    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2170        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2171    return internalDeleteSnapshots(null, snapshotNamePattern);
2172  }
2173
2174  @Override
2175  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2176    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2177        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2178    return internalDeleteSnapshots(tableNamePattern, null);
2179  }
2180
2181  @Override
2182  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2183      Pattern snapshotNamePattern) {
2184    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2185        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2186    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2187        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2188    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2189  }
2190
2191  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2192      Pattern snapshotNamePattern) {
2193    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2194    if (tableNamePattern == null) {
2195      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2196    } else {
2197      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2198    }
2199    CompletableFuture<Void> future = new CompletableFuture<>();
2200    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2201      if (err != null) {
2202        future.completeExceptionally(err);
2203        return;
2204      }
2205      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2206        future.complete(null);
2207        return;
2208      }
2209      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2210        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2211          if (e != null) {
2212            future.completeExceptionally(e);
2213          } else {
2214            future.complete(v);
2215          }
2216        });
2217    }));
2218    return future;
2219  }
2220
2221  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2222    return this
2223        .<Void> newMasterCaller()
2224        .action(
2225          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2226            controller,
2227            stub,
2228            DeleteSnapshotRequest.newBuilder()
2229                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2230                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2231  }
2232
2233  @Override
2234  public CompletableFuture<Void> execProcedure(String signature, String instance,
2235      Map<String, String> props) {
2236    CompletableFuture<Void> future = new CompletableFuture<>();
2237    ProcedureDescription procDesc =
2238      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2239    addListener(this.<Long> newMasterCaller()
2240      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2241        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2242        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2243      .call(), (expectedTimeout, err) -> {
2244        if (err != null) {
2245          future.completeExceptionally(err);
2246          return;
2247        }
2248        TimerTask pollingTask = new TimerTask() {
2249          int tries = 0;
2250          long startTime = EnvironmentEdgeManager.currentTime();
2251          long endTime = startTime + expectedTimeout;
2252          long maxPauseTime = expectedTimeout / maxAttempts;
2253
2254          @Override
2255          public void run(Timeout timeout) throws Exception {
2256            if (EnvironmentEdgeManager.currentTime() < endTime) {
2257              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2258                if (err2 != null) {
2259                  future.completeExceptionally(err2);
2260                  return;
2261                }
2262                if (done) {
2263                  future.complete(null);
2264                } else {
2265                  // retry again after pauseTime.
2266                  long pauseTime =
2267                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2268                  pauseTime = Math.min(pauseTime, maxPauseTime);
2269                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2270                    TimeUnit.MICROSECONDS);
2271                }
2272              });
2273            } else {
2274              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2275                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2276            }
2277          }
2278        };
2279        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2280        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2281      });
2282    return future;
2283  }
2284
2285  @Override
2286  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2287      Map<String, String> props) {
2288    ProcedureDescription proDesc =
2289        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2290    return this.<byte[]> newMasterCaller()
2291        .action(
2292          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2293            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2294            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2295            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2296        .call();
2297  }
2298
2299  @Override
2300  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2301      Map<String, String> props) {
2302    ProcedureDescription proDesc =
2303        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2304    return this.<Boolean> newMasterCaller()
2305        .action((controller, stub) -> this
2306            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2307              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2308              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2309        .call();
2310  }
2311
2312  @Override
2313  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2314    return this.<Boolean> newMasterCaller().action(
2315      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2316        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2317        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2318        .call();
2319  }
2320
2321  @Override
2322  public CompletableFuture<String> getProcedures() {
2323    return this
2324        .<String> newMasterCaller()
2325        .action(
2326          (controller, stub) -> this
2327              .<GetProceduresRequest, GetProceduresResponse, String> call(
2328                controller, stub, GetProceduresRequest.newBuilder().build(),
2329                (s, c, req, done) -> s.getProcedures(c, req, done),
2330                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2331  }
2332
2333  @Override
2334  public CompletableFuture<String> getLocks() {
2335    return this
2336        .<String> newMasterCaller()
2337        .action(
2338          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2339            controller, stub, GetLocksRequest.newBuilder().build(),
2340            (s, c, req, done) -> s.getLocks(c, req, done),
2341            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2342  }
2343
2344  @Override
2345  public CompletableFuture<Void> decommissionRegionServers(
2346      List<ServerName> servers, boolean offload) {
2347    return this.<Void> newMasterCaller()
2348        .action((controller, stub) -> this
2349          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2350            controller, stub,
2351              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2352            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2353        .call();
2354  }
2355
2356  @Override
2357  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2358    return this.<List<ServerName>> newMasterCaller()
2359        .action((controller, stub) -> this
2360          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2361            List<ServerName>> call(
2362              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2363              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2364              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2365                  .collect(Collectors.toList())))
2366        .call();
2367  }
2368
2369  @Override
2370  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2371      List<byte[]> encodedRegionNames) {
2372    return this.<Void> newMasterCaller()
2373        .action((controller, stub) ->
2374            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2375                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2376                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2377                        c, req, done), resp -> null)).call();
2378  }
2379
2380  /**
2381   * Get the region location for the passed region name. The region name may be a full region name
2382   * or encoded region name. If the region does not found, then it'll throw an
2383   * UnknownRegionException wrapped by a {@link CompletableFuture}
2384   * @param regionNameOrEncodedRegionName region name or encoded region name
2385   * @return region location, wrapped by a {@link CompletableFuture}
2386   */
2387  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2388    if (regionNameOrEncodedRegionName == null) {
2389      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2390    }
2391
2392    CompletableFuture<Optional<HRegionLocation>> future;
2393    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2394      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2395      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2396        // old format encodedName, should be meta region
2397        future = connection.registry.getMetaRegionLocations()
2398          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2399            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2400      } else {
2401        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2402          regionNameOrEncodedRegionName);
2403      }
2404    } else {
2405      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2406      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2407      RegionInfo regionInfo;
2408      try {
2409        regionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(
2410          regionNameOrEncodedRegionName);
2411      } catch (IOException ioe) {
2412        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2413      }
2414
2415      if (regionInfo.isMetaRegion()) {
2416        future = connection.registry.getMetaRegionLocations()
2417          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2418            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2419            .findFirst());
2420      } else {
2421        future =
2422          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2423      }
2424    }
2425
2426    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2427    addListener(future, (location, err) -> {
2428      if (err != null) {
2429        returnedFuture.completeExceptionally(err);
2430        return;
2431      }
2432      if (!location.isPresent() || location.get().getRegion() == null) {
2433        returnedFuture.completeExceptionally(
2434          new UnknownRegionException("Invalid region name or encoded region name: " +
2435            Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2436      } else {
2437        returnedFuture.complete(location.get());
2438      }
2439    });
2440    return returnedFuture;
2441  }
2442
2443  /**
2444   * Get the region info for the passed region name. The region name may be a full region name or
2445   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2446   * wrapped by a {@link CompletableFuture}
2447   * @return region info, wrapped by a {@link CompletableFuture}
2448   */
2449  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2450    if (regionNameOrEncodedRegionName == null) {
2451      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2452    }
2453
2454    if (Bytes.equals(regionNameOrEncodedRegionName,
2455      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2456      Bytes.equals(regionNameOrEncodedRegionName,
2457        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2458      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2459    }
2460
2461    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2462    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2463      if (err != null) {
2464        future.completeExceptionally(err);
2465      } else {
2466        future.complete(location.getRegion());
2467      }
2468    });
2469    return future;
2470  }
2471
2472  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2473    if (numRegions < 3) {
2474      throw new IllegalArgumentException("Must create at least three regions");
2475    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2476      throw new IllegalArgumentException("Start key must be smaller than end key");
2477    }
2478    if (numRegions == 3) {
2479      return new byte[][] { startKey, endKey };
2480    }
2481    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2482    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2483      throw new IllegalArgumentException("Unable to split key range into enough regions");
2484    }
2485    return splitKeys;
2486  }
2487
2488  private void verifySplitKeys(byte[][] splitKeys) {
2489    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2490    // Verify there are no duplicate split keys
2491    byte[] lastKey = null;
2492    for (byte[] splitKey : splitKeys) {
2493      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2494        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2495      }
2496      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2497        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2498            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2499      }
2500      lastKey = splitKey;
2501    }
2502  }
2503
2504  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2505
2506    abstract void onFinished();
2507
2508    abstract void onError(Throwable error);
2509
2510    @Override
2511    public void accept(Void v, Throwable error) {
2512      if (error != null) {
2513        onError(error);
2514        return;
2515      }
2516      onFinished();
2517    }
2518  }
2519
2520  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2521    protected final TableName tableName;
2522
2523    TableProcedureBiConsumer(TableName tableName) {
2524      this.tableName = tableName;
2525    }
2526
2527    abstract String getOperationType();
2528
2529    String getDescription() {
2530      return "Operation: " + getOperationType() + ", " + "Table Name: "
2531          + tableName.getNameWithNamespaceInclAsString();
2532    }
2533
2534    @Override
2535    void onFinished() {
2536      LOG.info(getDescription() + " completed");
2537    }
2538
2539    @Override
2540    void onError(Throwable error) {
2541      LOG.info(getDescription() + " failed with " + error.getMessage());
2542    }
2543  }
2544
2545  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2546    protected final String namespaceName;
2547
2548    NamespaceProcedureBiConsumer(String namespaceName) {
2549      this.namespaceName = namespaceName;
2550    }
2551
2552    abstract String getOperationType();
2553
2554    String getDescription() {
2555      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2556    }
2557
2558    @Override
2559    void onFinished() {
2560      LOG.info(getDescription() + " completed");
2561    }
2562
2563    @Override
2564    void onError(Throwable error) {
2565      LOG.info(getDescription() + " failed with " + error.getMessage());
2566    }
2567  }
2568
2569  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2570
2571    CreateTableProcedureBiConsumer(TableName tableName) {
2572      super(tableName);
2573    }
2574
2575    @Override
2576    String getOperationType() {
2577      return "CREATE";
2578    }
2579  }
2580
2581  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2582
2583    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2584      super(tableName);
2585    }
2586
2587    @Override
2588    String getOperationType() {
2589      return "ENABLE";
2590    }
2591  }
2592
2593  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2594
2595    DeleteTableProcedureBiConsumer(TableName tableName) {
2596      super(tableName);
2597    }
2598
2599    @Override
2600    String getOperationType() {
2601      return "DELETE";
2602    }
2603
2604    @Override
2605    void onFinished() {
2606      connection.getLocator().clearCache(this.tableName);
2607      super.onFinished();
2608    }
2609  }
2610
2611  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2612
2613    TruncateTableProcedureBiConsumer(TableName tableName) {
2614      super(tableName);
2615    }
2616
2617    @Override
2618    String getOperationType() {
2619      return "TRUNCATE";
2620    }
2621  }
2622
2623  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2624
2625    EnableTableProcedureBiConsumer(TableName tableName) {
2626      super(tableName);
2627    }
2628
2629    @Override
2630    String getOperationType() {
2631      return "ENABLE";
2632    }
2633  }
2634
2635  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2636
2637    DisableTableProcedureBiConsumer(TableName tableName) {
2638      super(tableName);
2639    }
2640
2641    @Override
2642    String getOperationType() {
2643      return "DISABLE";
2644    }
2645  }
2646
2647  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2648
2649    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2650      super(tableName);
2651    }
2652
2653    @Override
2654    String getOperationType() {
2655      return "ADD_COLUMN_FAMILY";
2656    }
2657  }
2658
2659  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2660
2661    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2662      super(tableName);
2663    }
2664
2665    @Override
2666    String getOperationType() {
2667      return "DELETE_COLUMN_FAMILY";
2668    }
2669  }
2670
2671  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2672
2673    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2674      super(tableName);
2675    }
2676
2677    @Override
2678    String getOperationType() {
2679      return "MODIFY_COLUMN_FAMILY";
2680    }
2681  }
2682
2683  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2684
2685    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2686      super(namespaceName);
2687    }
2688
2689    @Override
2690    String getOperationType() {
2691      return "CREATE_NAMESPACE";
2692    }
2693  }
2694
2695  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2696
2697    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2698      super(namespaceName);
2699    }
2700
2701    @Override
2702    String getOperationType() {
2703      return "DELETE_NAMESPACE";
2704    }
2705  }
2706
2707  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2708
2709    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2710      super(namespaceName);
2711    }
2712
2713    @Override
2714    String getOperationType() {
2715      return "MODIFY_NAMESPACE";
2716    }
2717  }
2718
2719  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2720
2721    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2722      super(tableName);
2723    }
2724
2725    @Override
2726    String getOperationType() {
2727      return "MERGE_REGIONS";
2728    }
2729  }
2730
2731  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2732
2733    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2734      super(tableName);
2735    }
2736
2737    @Override
2738    String getOperationType() {
2739      return "SPLIT_REGION";
2740    }
2741  }
2742
2743  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2744    private final String peerId;
2745    private final Supplier<String> getOperation;
2746
2747    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2748      this.peerId = peerId;
2749      this.getOperation = getOperation;
2750    }
2751
2752    String getDescription() {
2753      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2754    }
2755
2756    @Override
2757    void onFinished() {
2758      LOG.info(getDescription() + " completed");
2759    }
2760
2761    @Override
2762    void onError(Throwable error) {
2763      LOG.info(getDescription() + " failed with " + error.getMessage());
2764    }
2765  }
2766
2767  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2768    CompletableFuture<Void> future = new CompletableFuture<>();
2769    addListener(procFuture, (procId, error) -> {
2770      if (error != null) {
2771        future.completeExceptionally(error);
2772        return;
2773      }
2774      getProcedureResult(procId, future, 0);
2775    });
2776    return future;
2777  }
2778
2779  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2780    addListener(
2781      this.<GetProcedureResultResponse> newMasterCaller()
2782        .action((controller, stub) -> this
2783          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2784            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2785            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2786        .call(),
2787      (response, error) -> {
2788        if (error != null) {
2789          LOG.warn("failed to get the procedure result procId={}", procId,
2790            ConnectionUtils.translateException(error));
2791          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2792            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2793          return;
2794        }
2795        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2796          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2797            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2798          return;
2799        }
2800        if (response.hasException()) {
2801          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2802          future.completeExceptionally(ioe);
2803        } else {
2804          future.complete(null);
2805        }
2806      });
2807  }
2808
2809  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2810    CompletableFuture<T> future = new CompletableFuture<>();
2811    future.completeExceptionally(error);
2812    return future;
2813  }
2814
2815  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2816    if (error != null) {
2817      future.completeExceptionally(error);
2818      return true;
2819    }
2820    return false;
2821  }
2822
2823  @Override
2824  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2825    return getClusterMetrics(EnumSet.allOf(Option.class));
2826  }
2827
2828  @Override
2829  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2830    return this
2831        .<ClusterMetrics> newMasterCaller()
2832        .action(
2833          (controller, stub) -> this
2834              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2835                stub, RequestConverter.buildGetClusterStatusRequest(options),
2836                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2837                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2838  }
2839
2840  @Override
2841  public CompletableFuture<Void> shutdown() {
2842    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2843      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2844        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2845        resp -> null))
2846      .call();
2847  }
2848
2849  @Override
2850  public CompletableFuture<Void> stopMaster() {
2851    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2852      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2853        controller, stub, StopMasterRequest.newBuilder().build(),
2854        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2855      .call();
2856  }
2857
2858  @Override
2859  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2860    StopServerRequest request = RequestConverter
2861      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2862    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2863      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2864        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2865        resp -> null))
2866      .serverName(serverName).call();
2867  }
2868
2869  @Override
2870  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2871    return this
2872        .<Void> newAdminCaller()
2873        .action(
2874          (controller, stub) -> this
2875              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2876                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2877                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2878        .serverName(serverName).call();
2879  }
2880
2881  @Override
2882  public CompletableFuture<Void> updateConfiguration() {
2883    CompletableFuture<Void> future = new CompletableFuture<Void>();
2884    addListener(
2885      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2886      (status, err) -> {
2887        if (err != null) {
2888          future.completeExceptionally(err);
2889        } else {
2890          List<CompletableFuture<Void>> futures = new ArrayList<>();
2891          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2892          futures.add(updateConfiguration(status.getMasterName()));
2893          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2894          addListener(
2895            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2896            (result, err2) -> {
2897              if (err2 != null) {
2898                future.completeExceptionally(err2);
2899              } else {
2900                future.complete(result);
2901              }
2902            });
2903        }
2904      });
2905    return future;
2906  }
2907
2908  @Override
2909  public CompletableFuture<Void> updateConfiguration(String groupName) {
2910    CompletableFuture<Void> future = new CompletableFuture<Void>();
2911    addListener(
2912      getRSGroup(groupName),
2913      (rsGroupInfo, err) -> {
2914        if (err != null) {
2915          future.completeExceptionally(err);
2916        } else if (rsGroupInfo == null) {
2917          future.completeExceptionally(
2918            new IllegalArgumentException("Group does not exist: " + groupName));
2919        } else {
2920          addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
2921            if (err2 != null) {
2922              future.completeExceptionally(err2);
2923            } else {
2924              List<CompletableFuture<Void>> futures = new ArrayList<>();
2925              List<ServerName> groupServers = status.getServersName().stream().filter(
2926                s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
2927              groupServers.forEach(server -> futures.add(updateConfiguration(server)));
2928              addListener(
2929                CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2930                (result, err3) -> {
2931                  if (err3 != null) {
2932                    future.completeExceptionally(err3);
2933                  } else {
2934                    future.complete(result);
2935                  }
2936                });
2937            }
2938          });
2939        }
2940      });
2941    return future;
2942  }
2943
2944  @Override
2945  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2946    return this
2947        .<Void> newAdminCaller()
2948        .action(
2949          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2950            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2951            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2952        .serverName(serverName).call();
2953  }
2954
2955  @Override
2956  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2957    return this
2958        .<Void> newAdminCaller()
2959        .action(
2960          (controller, stub) -> this
2961              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2962                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2963                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2964        .serverName(serverName).call();
2965  }
2966
2967  @Override
2968  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2969    return this
2970        .<List<SecurityCapability>> newMasterCaller()
2971        .action(
2972          (controller, stub) -> this
2973              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2974                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2975                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2976                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2977        .call();
2978  }
2979
2980  @Override
2981  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2982    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2983  }
2984
2985  @Override
2986  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2987      TableName tableName) {
2988    Preconditions.checkNotNull(tableName,
2989      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2990    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2991  }
2992
2993  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2994      ServerName serverName) {
2995    return this.<List<RegionMetrics>> newAdminCaller()
2996        .action((controller, stub) -> this
2997            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2998              adminCall(controller, stub, request, (s, c, req, done) ->
2999                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
3000        .serverName(serverName).call();
3001  }
3002
3003  @Override
3004  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3005    return this
3006        .<Boolean> newMasterCaller()
3007        .action(
3008          (controller, stub) -> this
3009              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
3010                stub, IsInMaintenanceModeRequest.newBuilder().build(),
3011                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3012                resp -> resp.getInMaintenanceMode())).call();
3013  }
3014
3015  @Override
3016  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3017      CompactType compactType) {
3018    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3019
3020    switch (compactType) {
3021      case MOB:
3022        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3023          if (err != null) {
3024            future.completeExceptionally(err);
3025            return;
3026          }
3027          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3028
3029          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3030            .action((controller, stub) -> this
3031              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3032                controller, stub,
3033                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3034                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3035            .call(), (resp2, err2) -> {
3036              if (err2 != null) {
3037                future.completeExceptionally(err2);
3038              } else {
3039                if (resp2.hasCompactionState()) {
3040                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3041                } else {
3042                  future.complete(CompactionState.NONE);
3043                }
3044              }
3045            });
3046        });
3047        break;
3048      case NORMAL:
3049        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3050          if (err != null) {
3051            future.completeExceptionally(err);
3052            return;
3053          }
3054          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3055          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3056          locations.stream().filter(loc -> loc.getServerName() != null)
3057            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3058            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3059              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3060                // If any region compaction state is MAJOR_AND_MINOR
3061                // the table compaction state is MAJOR_AND_MINOR, too.
3062                if (err2 != null) {
3063                  future.completeExceptionally(unwrapCompletionException(err2));
3064                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3065                  future.complete(regionState);
3066                } else {
3067                  regionStates.add(regionState);
3068                }
3069              }));
3070            });
3071          addListener(
3072            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3073            (ret, err3) -> {
3074              // If future not completed, check all regions's compaction state
3075              if (!future.isCompletedExceptionally() && !future.isDone()) {
3076                CompactionState state = CompactionState.NONE;
3077                for (CompactionState regionState : regionStates) {
3078                  switch (regionState) {
3079                    case MAJOR:
3080                      if (state == CompactionState.MINOR) {
3081                        future.complete(CompactionState.MAJOR_AND_MINOR);
3082                      } else {
3083                        state = CompactionState.MAJOR;
3084                      }
3085                      break;
3086                    case MINOR:
3087                      if (state == CompactionState.MAJOR) {
3088                        future.complete(CompactionState.MAJOR_AND_MINOR);
3089                      } else {
3090                        state = CompactionState.MINOR;
3091                      }
3092                      break;
3093                    case NONE:
3094                    default:
3095                  }
3096                }
3097                if (!future.isDone()) {
3098                  future.complete(state);
3099                }
3100              }
3101            });
3102        });
3103        break;
3104      default:
3105        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3106    }
3107
3108    return future;
3109  }
3110
3111  @Override
3112  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3113    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3114    addListener(getRegionLocation(regionName), (location, err) -> {
3115      if (err != null) {
3116        future.completeExceptionally(err);
3117        return;
3118      }
3119      ServerName serverName = location.getServerName();
3120      if (serverName == null) {
3121        future
3122          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3123        return;
3124      }
3125      addListener(
3126        this.<GetRegionInfoResponse> newAdminCaller()
3127          .action((controller, stub) -> this
3128            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3129              controller, stub,
3130              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3131                true),
3132              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3133          .serverName(serverName).call(),
3134        (resp2, err2) -> {
3135          if (err2 != null) {
3136            future.completeExceptionally(err2);
3137          } else {
3138            if (resp2.hasCompactionState()) {
3139              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3140            } else {
3141              future.complete(CompactionState.NONE);
3142            }
3143          }
3144        });
3145    });
3146    return future;
3147  }
3148
3149  @Override
3150  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3151    MajorCompactionTimestampRequest request =
3152        MajorCompactionTimestampRequest.newBuilder()
3153            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3154    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3155        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3156            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3157                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3158  }
3159
3160  @Override
3161  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3162      byte[] regionName) {
3163    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3164    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3165    addListener(getRegionInfo(regionName), (region, err) -> {
3166      if (err != null) {
3167        future.completeExceptionally(err);
3168        return;
3169      }
3170      MajorCompactionTimestampForRegionRequest.Builder builder =
3171        MajorCompactionTimestampForRegionRequest.newBuilder();
3172      builder.setRegion(
3173        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3174      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3175        .<MajorCompactionTimestampForRegionRequest,
3176        MajorCompactionTimestampResponse, Optional<Long>> call(
3177          controller, stub, builder.build(),
3178          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3179          ProtobufUtil::toOptionalTimestamp))
3180        .call(), (timestamp, err2) -> {
3181          if (err2 != null) {
3182            future.completeExceptionally(err2);
3183          } else {
3184            future.complete(timestamp);
3185          }
3186        });
3187    });
3188    return future;
3189  }
3190
3191  @Override
3192  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3193      List<String> serverNamesList) {
3194    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3195    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3196      if (err != null) {
3197        future.completeExceptionally(err);
3198        return;
3199      }
3200      // Accessed by multiple threads.
3201      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3202      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3203      serverNames.stream().forEach(serverName -> {
3204        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3205          if (err2 != null) {
3206            future.completeExceptionally(unwrapCompletionException(err2));
3207          } else {
3208            serverStates.put(serverName, serverState);
3209          }
3210        }));
3211      });
3212      addListener(
3213        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3214        (ret, err3) -> {
3215          if (!future.isCompletedExceptionally()) {
3216            if (err3 != null) {
3217              future.completeExceptionally(err3);
3218            } else {
3219              future.complete(serverStates);
3220            }
3221          }
3222        });
3223    });
3224    return future;
3225  }
3226
3227  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3228    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3229    if (serverNamesList.isEmpty()) {
3230      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3231        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3232      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3233        if (err != null) {
3234          future.completeExceptionally(err);
3235        } else {
3236          future.complete(clusterMetrics.getServersName());
3237        }
3238      });
3239      return future;
3240    } else {
3241      List<ServerName> serverList = new ArrayList<>();
3242      for (String regionServerName : serverNamesList) {
3243        ServerName serverName = null;
3244        try {
3245          serverName = ServerName.valueOf(regionServerName);
3246        } catch (Exception e) {
3247          future.completeExceptionally(
3248            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3249        }
3250        if (serverName == null) {
3251          future.completeExceptionally(
3252            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3253        } else {
3254          serverList.add(serverName);
3255        }
3256      }
3257      future.complete(serverList);
3258    }
3259    return future;
3260  }
3261
3262  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3263    return this
3264        .<Boolean>newAdminCaller()
3265        .serverName(serverName)
3266        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3267            Boolean>adminCall(controller, stub,
3268            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3269            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3270  }
3271
3272  @Override
3273  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3274    return this.<Boolean> newMasterCaller()
3275      .action((controller, stub) -> this
3276        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3277          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3278          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3279          (resp) -> resp.getPrevBalanceValue()))
3280      .call();
3281  }
3282
3283  @Override
3284  public CompletableFuture<Boolean> balance(boolean forcible) {
3285    return this
3286        .<Boolean> newMasterCaller()
3287        .action(
3288          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3289            stub, RequestConverter.buildBalanceRequest(forcible),
3290            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3291  }
3292
3293  @Override
3294  public CompletableFuture<Boolean> isBalancerEnabled() {
3295    return this
3296        .<Boolean> newMasterCaller()
3297        .action((controller, stub) ->
3298              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3299            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3300                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3301  }
3302
3303  @Override
3304  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3305    return this
3306        .<Boolean> newMasterCaller()
3307        .action(
3308          (controller, stub) -> this
3309              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3310                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3311                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3312                    .getPrevNormalizerValue())).call();
3313  }
3314
3315  @Override
3316  public CompletableFuture<Boolean> isNormalizerEnabled() {
3317    return this
3318        .<Boolean> newMasterCaller()
3319        .action(
3320          (controller, stub) -> this
3321              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3322                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3323                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3324                (resp) -> resp.getEnabled())).call();
3325  }
3326
3327  @Override
3328  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3329    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3330  }
3331
3332  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3333    return this
3334      .<Boolean> newMasterCaller()
3335      .action(
3336        (controller, stub) -> this.call(
3337          controller, stub, request, MasterService.Interface::normalize,
3338          NormalizeResponse::getNormalizerRan))
3339      .call();
3340  }
3341
3342  @Override
3343  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3344    return this
3345        .<Boolean> newMasterCaller()
3346        .action(
3347          (controller, stub) -> this
3348              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3349                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3350                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3351                    .getPrevValue())).call();
3352  }
3353
3354  @Override
3355  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3356    return this
3357        .<Boolean> newMasterCaller()
3358        .action(
3359          (controller, stub) -> this
3360              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3361                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3362                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3363        .call();
3364  }
3365
3366  @Override
3367  public CompletableFuture<Boolean> runCleanerChore() {
3368    return this
3369        .<Boolean> newMasterCaller()
3370        .action(
3371          (controller, stub) -> this
3372              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3373                RequestConverter.buildRunCleanerChoreRequest(),
3374                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3375                (resp) -> resp.getCleanerChoreRan())).call();
3376  }
3377
3378  @Override
3379  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3380    return this
3381        .<Boolean> newMasterCaller()
3382        .action(
3383          (controller, stub) -> this
3384              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3385                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3386                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3387                    .getPrevValue())).call();
3388  }
3389
3390  @Override
3391  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3392    return this
3393        .<Boolean> newMasterCaller()
3394        .action(
3395          (controller, stub) -> this
3396              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3397                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3398                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3399                    .getValue())).call();
3400  }
3401
3402  @Override
3403  public CompletableFuture<Integer> runCatalogJanitor() {
3404    return this
3405        .<Integer> newMasterCaller()
3406        .action(
3407          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3408            controller, stub, RequestConverter.buildCatalogScanRequest(),
3409            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3410        .call();
3411  }
3412
3413  @Override
3414  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3415      ServiceCaller<S, R> callable) {
3416    MasterCoprocessorRpcChannelImpl channel =
3417        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3418    S stub = stubMaker.apply(channel);
3419    CompletableFuture<R> future = new CompletableFuture<>();
3420    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3421    callable.call(stub, controller, resp -> {
3422      if (controller.failed()) {
3423        future.completeExceptionally(controller.getFailed());
3424      } else {
3425        future.complete(resp);
3426      }
3427    });
3428    return future;
3429  }
3430
3431  @Override
3432  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3433      ServiceCaller<S, R> callable, ServerName serverName) {
3434    RegionServerCoprocessorRpcChannelImpl channel =
3435        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3436          serverName));
3437    S stub = stubMaker.apply(channel);
3438    CompletableFuture<R> future = new CompletableFuture<>();
3439    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3440    callable.call(stub, controller, resp -> {
3441      if (controller.failed()) {
3442        future.completeExceptionally(controller.getFailed());
3443      } else {
3444        future.complete(resp);
3445      }
3446    });
3447    return future;
3448  }
3449
3450  @Override
3451  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3452    return this.<List<ServerName>> newMasterCaller()
3453      .action((controller, stub) -> this
3454        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3455          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3456          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3457          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3458      .call();
3459  }
3460
3461  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3462    return this.connection.callerFactory.<T> serverRequest()
3463      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3464      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3465      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3466      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3467  }
3468
3469  @Override
3470  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3471    if (tableName == null) {
3472      return failedFuture(new IllegalArgumentException("Table name is null"));
3473    }
3474    CompletableFuture<Void> future = new CompletableFuture<>();
3475    addListener(tableExists(tableName), (exist, err) -> {
3476      if (err != null) {
3477        future.completeExceptionally(err);
3478        return;
3479      }
3480      if (!exist) {
3481        future.completeExceptionally(new TableNotFoundException(
3482          "Table '" + tableName.getNameAsString() + "' does not exists."));
3483        return;
3484      }
3485      addListener(getTableSplits(tableName), (splits, err1) -> {
3486        if (err1 != null) {
3487          future.completeExceptionally(err1);
3488        } else {
3489          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3490            if (err2 != null) {
3491              future.completeExceptionally(err2);
3492            } else {
3493              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3494                if (err3 != null) {
3495                  future.completeExceptionally(err3);
3496                } else {
3497                  future.complete(result3);
3498                }
3499              });
3500            }
3501          });
3502        }
3503      });
3504    });
3505    return future;
3506  }
3507
3508  @Override
3509  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3510    if (tableName == null) {
3511      return failedFuture(new IllegalArgumentException("Table name is null"));
3512    }
3513    CompletableFuture<Void> future = new CompletableFuture<>();
3514    addListener(tableExists(tableName), (exist, err) -> {
3515      if (err != null) {
3516        future.completeExceptionally(err);
3517        return;
3518      }
3519      if (!exist) {
3520        future.completeExceptionally(new TableNotFoundException(
3521          "Table '" + tableName.getNameAsString() + "' does not exists."));
3522        return;
3523      }
3524      addListener(setTableReplication(tableName, false), (result, err2) -> {
3525        if (err2 != null) {
3526          future.completeExceptionally(err2);
3527        } else {
3528          future.complete(result);
3529        }
3530      });
3531    });
3532    return future;
3533  }
3534
3535  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3536    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3537    addListener(
3538      getRegions(tableName).thenApply(regions -> regions.stream()
3539        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3540      (regions, err2) -> {
3541        if (err2 != null) {
3542          future.completeExceptionally(err2);
3543          return;
3544        }
3545        if (regions.size() == 1) {
3546          future.complete(null);
3547        } else {
3548          byte[][] splits = new byte[regions.size() - 1][];
3549          for (int i = 1; i < regions.size(); i++) {
3550            splits[i - 1] = regions.get(i).getStartKey();
3551          }
3552          future.complete(splits);
3553        }
3554      });
3555    return future;
3556  }
3557
3558  /**
3559   * Connect to peer and check the table descriptor on peer:
3560   * <ol>
3561   * <li>Create the same table on peer when not exist.</li>
3562   * <li>Throw an exception if the table already has replication enabled on any of the column
3563   * families.</li>
3564   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3565   * </ol>
3566   * @param tableName name of the table to sync to the peer
3567   * @param splits table split keys
3568   */
3569  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3570      byte[][] splits) {
3571    CompletableFuture<Void> future = new CompletableFuture<>();
3572    addListener(listReplicationPeers(), (peers, err) -> {
3573      if (err != null) {
3574        future.completeExceptionally(err);
3575        return;
3576      }
3577      if (peers == null || peers.size() <= 0) {
3578        future.completeExceptionally(
3579          new IllegalArgumentException("Found no peer cluster for replication."));
3580        return;
3581      }
3582      List<CompletableFuture<Void>> futures = new ArrayList<>();
3583      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3584        .forEach(peer -> {
3585          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3586        });
3587      addListener(
3588        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3589        (result, err2) -> {
3590          if (err2 != null) {
3591            future.completeExceptionally(err2);
3592          } else {
3593            future.complete(result);
3594          }
3595        });
3596    });
3597    return future;
3598  }
3599
3600  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3601      ReplicationPeerDescription peer) {
3602    Configuration peerConf = null;
3603    try {
3604      peerConf =
3605        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3606    } catch (IOException e) {
3607      return failedFuture(e);
3608    }
3609    CompletableFuture<Void> future = new CompletableFuture<>();
3610    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3611      if (err != null) {
3612        future.completeExceptionally(err);
3613        return;
3614      }
3615      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3616        if (err1 != null) {
3617          future.completeExceptionally(err1);
3618          return;
3619        }
3620        AsyncAdmin peerAdmin = conn.getAdmin();
3621        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3622          if (err2 != null) {
3623            future.completeExceptionally(err2);
3624            return;
3625          }
3626          if (!exist) {
3627            CompletableFuture<Void> createTableFuture = null;
3628            if (splits == null) {
3629              createTableFuture = peerAdmin.createTable(tableDesc);
3630            } else {
3631              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3632            }
3633            addListener(createTableFuture, (result, err3) -> {
3634              if (err3 != null) {
3635                future.completeExceptionally(err3);
3636              } else {
3637                future.complete(result);
3638              }
3639            });
3640          } else {
3641            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3642              (result, err4) -> {
3643                if (err4 != null) {
3644                  future.completeExceptionally(err4);
3645                } else {
3646                  future.complete(result);
3647                }
3648              });
3649          }
3650        });
3651      });
3652    });
3653    return future;
3654  }
3655
3656  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3657      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3658    CompletableFuture<Void> future = new CompletableFuture<>();
3659    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3660      if (err != null) {
3661        future.completeExceptionally(err);
3662        return;
3663      }
3664      if (peerTableDesc == null) {
3665        future.completeExceptionally(
3666          new IllegalArgumentException("Failed to get table descriptor for table " +
3667            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3668        return;
3669      }
3670      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3671        future.completeExceptionally(new IllegalArgumentException(
3672          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3673            ", but the table descriptors are not same when compared with source cluster." +
3674            " Thus can not enable the table's replication switch."));
3675        return;
3676      }
3677      future.complete(null);
3678    });
3679    return future;
3680  }
3681
3682  /**
3683   * Set the table's replication switch if the table's replication switch is already not set.
3684   * @param tableName name of the table
3685   * @param enableRep is replication switch enable or disable
3686   */
3687  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3688    CompletableFuture<Void> future = new CompletableFuture<>();
3689    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3690      if (err != null) {
3691        future.completeExceptionally(err);
3692        return;
3693      }
3694      if (!tableDesc.matchReplicationScope(enableRep)) {
3695        int scope =
3696          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3697        TableDescriptor newTableDesc =
3698          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3699        addListener(modifyTable(newTableDesc), (result, err2) -> {
3700          if (err2 != null) {
3701            future.completeExceptionally(err2);
3702          } else {
3703            future.complete(result);
3704          }
3705        });
3706      } else {
3707        future.complete(null);
3708      }
3709    });
3710    return future;
3711  }
3712
3713  @Override
3714  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3715    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3716    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3717      if (err != null) {
3718        future.completeExceptionally(err);
3719        return;
3720      }
3721      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3722        locations.stream().filter(l -> l.getRegion() != null)
3723          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3724          .collect(Collectors.groupingBy(l -> l.getServerName(),
3725            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3726      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3727      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3728      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3729        futures
3730          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3731            if (err2 != null) {
3732              future.completeExceptionally(unwrapCompletionException(err2));
3733            } else {
3734              aggregator.append(stats);
3735            }
3736          }));
3737      }
3738      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3739        (ret, err3) -> {
3740          if (err3 != null) {
3741            future.completeExceptionally(unwrapCompletionException(err3));
3742          } else {
3743            future.complete(aggregator.sum());
3744          }
3745        });
3746    });
3747    return future;
3748  }
3749
3750  @Override
3751  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3752      boolean preserveSplits) {
3753    CompletableFuture<Void> future = new CompletableFuture<>();
3754    addListener(tableExists(tableName), (exist, err) -> {
3755      if (err != null) {
3756        future.completeExceptionally(err);
3757        return;
3758      }
3759      if (!exist) {
3760        future.completeExceptionally(new TableNotFoundException(tableName));
3761        return;
3762      }
3763      addListener(tableExists(newTableName), (exist1, err1) -> {
3764        if (err1 != null) {
3765          future.completeExceptionally(err1);
3766          return;
3767        }
3768        if (exist1) {
3769          future.completeExceptionally(new TableExistsException(newTableName));
3770          return;
3771        }
3772        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3773          if (err2 != null) {
3774            future.completeExceptionally(err2);
3775            return;
3776          }
3777          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3778          if (preserveSplits) {
3779            addListener(getTableSplits(tableName), (splits, err3) -> {
3780              if (err3 != null) {
3781                future.completeExceptionally(err3);
3782              } else {
3783                addListener(
3784                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3785                  (result, err4) -> {
3786                    if (err4 != null) {
3787                      future.completeExceptionally(err4);
3788                    } else {
3789                      future.complete(result);
3790                    }
3791                  });
3792              }
3793            });
3794          } else {
3795            addListener(createTable(newTableDesc), (result, err5) -> {
3796              if (err5 != null) {
3797                future.completeExceptionally(err5);
3798              } else {
3799                future.complete(result);
3800              }
3801            });
3802          }
3803        });
3804      });
3805    });
3806    return future;
3807  }
3808
3809  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3810      List<RegionInfo> hris) {
3811    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3812      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3813        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3814        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3815        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3816      .serverName(serverName).call();
3817  }
3818
3819  @Override
3820  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3821    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3822        .action((controller, stub) -> this
3823            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3824              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3825              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3826              resp -> resp.getPreviousRpcThrottleEnabled()))
3827        .call();
3828    return future;
3829  }
3830
3831  @Override
3832  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3833    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3834        .action((controller, stub) -> this
3835            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3836              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3837              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3838              resp -> resp.getRpcThrottleEnabled()))
3839        .call();
3840    return future;
3841  }
3842
3843  @Override
3844  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3845    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3846        .action((controller, stub) -> this
3847            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3848              controller, stub,
3849              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3850                  .build(),
3851              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3852              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3853        .call();
3854    return future;
3855  }
3856
3857  @Override
3858  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3859    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3860      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3861      Map<TableName, Long>> call(controller, stub,
3862        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3863        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3864        resp -> resp.getSizesList().stream().collect(Collectors
3865          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3866      .call();
3867  }
3868
3869  @Override
3870  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3871      ServerName serverName) {
3872    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3873      .action((controller, stub) -> this
3874        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3875        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3876          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3877          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3878          resp -> resp.getSnapshotsList().stream()
3879            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3880              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3881      .serverName(serverName).call();
3882  }
3883
3884  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3885      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3886    return this.<SpaceQuotaSnapshot> newMasterCaller()
3887      .action((controller, stub) -> this
3888        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3889          RequestConverter.buildGetQuotaStatesRequest(),
3890          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3891      .call();
3892  }
3893
3894  @Override
3895  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3896    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3897      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3898      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3899  }
3900
3901  @Override
3902  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3903    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3904    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3905      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3906      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3907  }
3908
3909  @Override
3910  public CompletableFuture<Void> grant(UserPermission userPermission,
3911      boolean mergeExistingPermissions) {
3912    return this.<Void> newMasterCaller()
3913        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3914          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3915          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3916        .call();
3917  }
3918
3919  @Override
3920  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3921    return this.<Void> newMasterCaller()
3922        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3923          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3924          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3925        .call();
3926  }
3927
3928  @Override
3929  public CompletableFuture<List<UserPermission>>
3930      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3931    return this.<List<UserPermission>> newMasterCaller().action((controller,
3932        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3933            List<UserPermission>> call(controller, stub,
3934              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3935              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3936              resp -> resp.getUserPermissionList().stream()
3937                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3938                .collect(Collectors.toList())))
3939        .call();
3940  }
3941
3942  @Override
3943  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3944      List<Permission> permissions) {
3945    return this.<List<Boolean>> newMasterCaller()
3946        .action((controller, stub) -> this
3947            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3948              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3949              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3950              resp -> resp.getHasUserPermissionList()))
3951        .call();
3952  }
3953
3954  @Override
3955  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3956      final boolean sync) {
3957    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3958        .call(controller, stub, RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3959            MasterService.Interface::switchSnapshotCleanup,
3960            SetSnapshotCleanupResponse::getPrevSnapshotCleanup)).call();
3961  }
3962
3963  @Override
3964  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3965    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3966        .call(controller, stub, RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3967            MasterService.Interface::isSnapshotCleanupEnabled,
3968            IsSnapshotCleanupEnabledResponse::getEnabled)).call();
3969  }
3970
3971  @Override
3972  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
3973    return this.<Void> newMasterCaller()
3974        .action((controller, stub) -> this.
3975            <MoveServersRequest, MoveServersResponse, Void> call(controller, stub,
3976                RequestConverter.buildMoveServersRequest(servers, groupName),
3977              (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
3978        .call();
3979  }
3980
3981  @Override
3982  public CompletableFuture<Void> addRSGroup(String groupName) {
3983    return this.<Void> newMasterCaller()
3984        .action(((controller, stub) -> this.
3985            <AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, stub,
3986                AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3987              (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
3988        .call();
3989  }
3990
3991  @Override
3992  public CompletableFuture<Void> removeRSGroup(String groupName) {
3993    return this.<Void> newMasterCaller()
3994        .action((controller, stub) -> this.
3995            <RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(controller, stub,
3996                RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3997              (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
3998        .call();
3999  }
4000
4001  @Override
4002  public CompletableFuture<Boolean> balanceRSGroup(String groupName) {
4003    return this.<Boolean> newMasterCaller()
4004        .action((controller, stub) -> this.
4005            <BalanceRSGroupRequest, BalanceRSGroupResponse, Boolean> call(controller, stub,
4006                BalanceRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4007              (s, c, req, done) -> s.balanceRSGroup(c, req, done), resp -> resp.getBalanceRan()))
4008        .call();
4009  }
4010
4011  @Override
4012  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4013    return this.<List<RSGroupInfo>> newMasterCaller()
4014        .action((controller, stub) -> this
4015            .<ListRSGroupInfosRequest, ListRSGroupInfosResponse, List<RSGroupInfo>> call(
4016                controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4017              (s, c, req, done) -> s.listRSGroupInfos(c, req, done),
4018              resp -> resp.getRSGroupInfoList().stream()
4019                  .map(r -> ProtobufUtil.toGroupInfo(r))
4020                  .collect(Collectors.toList())))
4021        .call();
4022  }
4023
4024  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4025      final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4026      final String logType) {
4027    if (CollectionUtils.isEmpty(serverNames)) {
4028      return CompletableFuture.completedFuture(Collections.emptyList());
4029    }
4030    return CompletableFuture.supplyAsync(() -> serverNames.stream()
4031      .map((ServerName serverName) ->
4032        getSlowLogResponseFromServer(serverName, filterParams, limit, logType))
4033      .map(CompletableFuture::join)
4034      .flatMap(List::stream)
4035      .collect(Collectors.toList()));
4036  }
4037
4038  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4039      Map<String, Object> filterParams, int limit, String logType) {
4040    return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this
4041      .adminCall(controller, stub,
4042        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4043        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4044      .serverName(serverName).call();
4045  }
4046
4047  @Override
4048  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
4049      @Nullable Set<ServerName> serverNames) {
4050    if (CollectionUtils.isEmpty(serverNames)) {
4051      return CompletableFuture.completedFuture(Collections.emptyList());
4052    }
4053    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
4054      .map(this::clearSlowLogsResponses)
4055      .collect(Collectors.toList());
4056    return convertToFutureOfList(clearSlowLogResponseList);
4057  }
4058
4059  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4060    return this.<Boolean>newAdminCaller()
4061      .action(((controller, stub) -> this
4062        .adminCall(
4063          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
4064          AdminService.Interface::clearSlowLogsResponses,
4065          ProtobufUtil::toClearSlowLogPayload))
4066      ).serverName(serverName).call();
4067  }
4068
4069  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
4070    List<CompletableFuture<T>> futures) {
4071    CompletableFuture<Void> allDoneFuture =
4072      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4073    return allDoneFuture.thenApply(v ->
4074      futures.stream()
4075        .map(CompletableFuture::join)
4076        .collect(Collectors.toList())
4077    );
4078  }
4079
4080  @Override
4081  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4082    return this.<List<TableName>> newMasterCaller()
4083      .action((controller, stub) -> this
4084        .<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, List<TableName>> call(controller,
4085          stub, ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4086          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4087            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4088      .call();
4089  }
4090
4091  @Override
4092  public CompletableFuture<Pair<List<String>, List<TableName>>>
4093    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4094    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4095      .action((controller, stub) -> this
4096        .<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4097          GetConfiguredNamespacesAndTablesInRSGroupResponse,
4098          Pair<List<String>, List<TableName>>> call(controller, stub,
4099          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4100            .build(),
4101            (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4102            resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4103              .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4104      .call();
4105  }
4106
4107  @Override
4108  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4109    return this.<RSGroupInfo> newMasterCaller()
4110      .action(((controller, stub) -> this
4111        .<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, RSGroupInfo> call(
4112          controller, stub,
4113          GetRSGroupInfoOfServerRequest.newBuilder()
4114            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4115              .setPort(hostPort.getPort()).build())
4116            .build(),
4117          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4118          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4119      .call();
4120  }
4121
4122  @Override
4123  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4124    return this.<Void> newMasterCaller()
4125      .action((controller, stub) -> this.
4126            <RemoveServersRequest, RemoveServersResponse, Void> call(controller, stub,
4127                RequestConverter.buildRemoveServersRequest(servers),
4128              (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4129        .call();
4130  }
4131
4132  @Override
4133  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4134    CompletableFuture<Void> future = new CompletableFuture<>();
4135    for (TableName tableName : tables) {
4136      addListener(tableExists(tableName), (exist, err) -> {
4137        if (err != null) {
4138          future.completeExceptionally(err);
4139          return;
4140        }
4141        if (!exist) {
4142          future.completeExceptionally(new TableNotFoundException(tableName));
4143          return;
4144        }
4145      });
4146    }
4147    addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4148      if (err != null) {
4149        future.completeExceptionally(err);
4150        return;
4151      }
4152      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4153        future.complete(null);
4154        return;
4155      }
4156      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4157      for (TableDescriptor td : tableDescriptions) {
4158        newTableDescriptors
4159            .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4160      }
4161      addListener(CompletableFuture.allOf(
4162        newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4163        (v, e) -> {
4164          if (e != null) {
4165            future.completeExceptionally(e);
4166          } else {
4167            future.complete(v);
4168          }
4169        });
4170    }));
4171    return future;
4172  }
4173
4174  @Override
4175  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4176    return this.<RSGroupInfo> newMasterCaller().action(((controller, stub) -> this
4177      .<GetRSGroupInfoOfTableRequest, GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller,
4178        stub,
4179        GetRSGroupInfoOfTableRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(table))
4180          .build(),
4181        (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4182        resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4183      .call();
4184  }
4185
4186  @Override
4187  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4188    return this.<RSGroupInfo> newMasterCaller()
4189      .action(((controller, stub) -> this
4190        .<GetRSGroupInfoRequest, GetRSGroupInfoResponse, RSGroupInfo> call(controller, stub,
4191          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4192          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4193          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4194      .call();
4195  }
4196
4197  @Override
4198  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4199    return this.<Void> newMasterCaller()
4200      .action(
4201        (
4202          (controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4203            controller,
4204            stub,
4205            RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName).setNewRsgroupName(newName)
4206                                .build(),
4207            (s, c, req, done) -> s.renameRSGroup(c, req, done),
4208            resp -> null
4209          )
4210        )
4211      ).call();
4212  }
4213
4214  @Override
4215  public CompletableFuture<Void>
4216    updateRSGroupConfig(String groupName, Map<String, String> configuration) {
4217    UpdateRSGroupConfigRequest.Builder request = UpdateRSGroupConfigRequest.newBuilder()
4218        .setGroupName(groupName);
4219    if (configuration != null) {
4220      configuration.entrySet().forEach(e ->
4221          request.addConfiguration(NameStringPair.newBuilder().setName(e.getKey())
4222              .setValue(e.getValue()).build()));
4223    }
4224    return this.<Void> newMasterCaller()
4225        .action(((controller, stub) ->
4226            this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
4227                controller, stub, request.build(),
4228              (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4229        ).call();
4230  }
4231
4232  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4233    return this.<List<LogEntry>>newMasterCaller()
4234      .action((controller, stub) ->
4235        this.call(controller, stub,
4236          ProtobufUtil.toBalancerDecisionRequest(limit),
4237          MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse))
4238      .call();
4239  }
4240
4241  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4242    return this.<List<LogEntry>>newMasterCaller()
4243      .action((controller, stub) ->
4244        this.call(controller, stub,
4245          ProtobufUtil.toBalancerRejectionRequest(limit),
4246          MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerRejectionResponse))
4247      .call();
4248  }
4249
4250  @Override
4251  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4252      String logType, ServerType serverType, int limit,
4253      Map<String, Object> filterParams) {
4254    if (logType == null || serverType == null) {
4255      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4256    }
4257    switch (logType){
4258      case "SLOW_LOG":
4259      case "LARGE_LOG":
4260        if (ServerType.MASTER.equals(serverType)) {
4261          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4262        }
4263        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4264      case "BALANCER_DECISION":
4265        if (ServerType.REGION_SERVER.equals(serverType)) {
4266          throw new IllegalArgumentException(
4267            "Balancer Decision logs are not maintained by HRegionServer");
4268        }
4269        return getBalancerDecisions(limit);
4270      case "BALANCER_REJECTION":
4271        if (ServerType.REGION_SERVER.equals(serverType)) {
4272          throw new IllegalArgumentException(
4273            "Balancer Rejection logs are not maintained by HRegionServer");
4274        }
4275        return getBalancerRejections(limit);
4276      default:
4277        return CompletableFuture.completedFuture(Collections.emptyList());
4278    }
4279  }
4280
4281}