001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.quotas.QuotaFilter;
080import org.apache.hadoop.hbase.quotas.QuotaSettings;
081import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
083import org.apache.hadoop.hbase.replication.ReplicationException;
084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
086import org.apache.hadoop.hbase.replication.SyncReplicationState;
087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
104import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
105import org.apache.hbase.thirdparty.com.google.protobuf.Message;
106import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
107import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
108import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
109import org.apache.hbase.thirdparty.io.netty.util.Timeout;
110import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
111import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
112
113import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
114import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
333
334/**
335 * The implementation of AsyncAdmin.
336 * <p>
337 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
338 * be finished inside the rpc framework thread, which means that the callbacks registered to the
339 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
340 * this class should not try to do time consuming tasks in the callbacks.
341 * @since 2.0.0
342 * @see AsyncHBaseAdmin
343 * @see AsyncConnection#getAdmin()
344 * @see AsyncConnection#getAdminBuilder()
345 */
346@InterfaceAudience.Private
347class RawAsyncHBaseAdmin implements AsyncAdmin {
348
349  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
350
351  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
352
353  private final AsyncConnectionImpl connection;
354
355  private final HashedWheelTimer retryTimer;
356
357  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
358
359  private final long rpcTimeoutNs;
360
361  private final long operationTimeoutNs;
362
363  private final long pauseNs;
364
365  private final long pauseForCQTBENs;
366
367  private final int maxAttempts;
368
369  private final int startLogErrorsCnt;
370
371  private final NonceGenerator ng;
372
373  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
374      AsyncAdminBuilderBase builder) {
375    this.connection = connection;
376    this.retryTimer = retryTimer;
377    this.metaTable = connection.getTable(META_TABLE_NAME);
378    this.rpcTimeoutNs = builder.rpcTimeoutNs;
379    this.operationTimeoutNs = builder.operationTimeoutNs;
380    this.pauseNs = builder.pauseNs;
381    if (builder.pauseForCQTBENs < builder.pauseNs) {
382      LOG.warn(
383        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
384          " the normal pause value {} ms, use the greater one instead",
385        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
386        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
387      this.pauseForCQTBENs = builder.pauseNs;
388    } else {
389      this.pauseForCQTBENs = builder.pauseForCQTBENs;
390    }
391    this.maxAttempts = builder.maxAttempts;
392    this.startLogErrorsCnt = builder.startLogErrorsCnt;
393    this.ng = connection.getNonceGenerator();
394  }
395
396  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
397    return this.connection.callerFactory.<T> masterRequest()
398      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
399      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
400      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
401      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
402  }
403
404  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
405    return this.connection.callerFactory.<T> adminRequest()
406      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
407      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
408      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
409      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
410  }
411
412  @FunctionalInterface
413  private interface MasterRpcCall<RESP, REQ> {
414    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
415        RpcCallback<RESP> done);
416  }
417
418  @FunctionalInterface
419  private interface AdminRpcCall<RESP, REQ> {
420    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
421        RpcCallback<RESP> done);
422  }
423
424  @FunctionalInterface
425  private interface Converter<D, S> {
426    D convert(S src) throws IOException;
427  }
428
429  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
430      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
431      Converter<RESP, PRESP> respConverter) {
432    CompletableFuture<RESP> future = new CompletableFuture<>();
433    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
434
435      @Override
436      public void run(PRESP resp) {
437        if (controller.failed()) {
438          future.completeExceptionally(controller.getFailed());
439        } else {
440          try {
441            future.complete(respConverter.convert(resp));
442          } catch (IOException e) {
443            future.completeExceptionally(e);
444          }
445        }
446      }
447    });
448    return future;
449  }
450
451  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
452      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
453      Converter<RESP, PRESP> respConverter) {
454    CompletableFuture<RESP> future = new CompletableFuture<>();
455    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
456
457      @Override
458      public void run(PRESP resp) {
459        if (controller.failed()) {
460          future.completeExceptionally(controller.getFailed());
461        } else {
462          try {
463            future.complete(respConverter.convert(resp));
464          } catch (IOException e) {
465            future.completeExceptionally(e);
466          }
467        }
468      }
469    });
470    return future;
471  }
472
473  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
474      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
475      ProcedureBiConsumer consumer) {
476    return procedureCall(b -> {
477    }, preq, rpcCall, respConverter, consumer);
478  }
479
480  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
481      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
482      ProcedureBiConsumer consumer) {
483    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
484  }
485
486  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
487      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
488      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
489      ProcedureBiConsumer consumer) {
490    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
491        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
492    prioritySetter.accept(builder);
493    CompletableFuture<Long> procFuture = builder.call();
494    CompletableFuture<Void> future = waitProcedureResult(procFuture);
495    addListener(future, consumer);
496    return future;
497  }
498
499  @FunctionalInterface
500  private interface TableOperator {
501    CompletableFuture<Void> operate(TableName table);
502  }
503
504  @Override
505  public CompletableFuture<Boolean> tableExists(TableName tableName) {
506    if (TableName.isMetaTableName(tableName)) {
507      return CompletableFuture.completedFuture(true);
508    }
509    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
510  }
511
512  @Override
513  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
514    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
515      includeSysTables));
516  }
517
518  /**
519   * {@link #listTableDescriptors(boolean)}
520   */
521  @Override
522  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
523      boolean includeSysTables) {
524    Preconditions.checkNotNull(pattern,
525      "pattern is null. If you don't specify a pattern, "
526          + "use listTableDescriptors(boolean) instead");
527    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
528      includeSysTables));
529  }
530
531  @Override
532  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
533    Preconditions.checkNotNull(tableNames,
534      "tableNames is null. If you don't specify tableNames, "
535          + "use listTableDescriptors(boolean) instead");
536    if (tableNames.isEmpty()) {
537      return CompletableFuture.completedFuture(Collections.emptyList());
538    }
539    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
540  }
541
542  private CompletableFuture<List<TableDescriptor>>
543      getTableDescriptors(GetTableDescriptorsRequest request) {
544    return this.<List<TableDescriptor>> newMasterCaller()
545        .action((controller, stub) -> this
546            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
547              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
548              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
549        .call();
550  }
551
552  @Override
553  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
554    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
555  }
556
557  @Override
558  public CompletableFuture<List<TableName>>
559      listTableNames(Pattern pattern, boolean includeSysTables) {
560    Preconditions.checkNotNull(pattern,
561        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
562    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
563  }
564
565  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
566    return this
567        .<List<TableName>> newMasterCaller()
568        .action(
569          (controller, stub) -> this
570              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
571                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
572                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
573  }
574
575  @Override
576  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
577    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
578        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
579        List<TableDescriptor>> call(
580          controller, stub,
581          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
582          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
583          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
584        .call();
585  }
586
587  @Override
588  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
589    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
590        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
591        List<TableName>> call(
592          controller, stub,
593          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
594          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
595          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
596        .call();
597  }
598
599  @Override
600  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
601    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
602    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
603      .action((controller, stub) -> this
604        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
605          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
606          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
607          (resp) -> resp.getTableSchemaList()))
608      .call(), (tableSchemas, error) -> {
609        if (error != null) {
610          future.completeExceptionally(error);
611          return;
612        }
613        if (!tableSchemas.isEmpty()) {
614          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
615        } else {
616          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
617        }
618      });
619    return future;
620  }
621
622  @Override
623  public CompletableFuture<Void> createTable(TableDescriptor desc) {
624    return createTable(desc.getTableName(),
625      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
626  }
627
628  @Override
629  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
630      int numRegions) {
631    try {
632      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
633    } catch (IllegalArgumentException e) {
634      return failedFuture(e);
635    }
636  }
637
638  @Override
639  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
640    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
641        + " use createTable(TableDescriptor) instead");
642    try {
643      verifySplitKeys(splitKeys);
644      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
645        splitKeys, ng.getNonceGroup(), ng.newNonce()));
646    } catch (IllegalArgumentException e) {
647      return failedFuture(e);
648    }
649  }
650
651  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
652    Preconditions.checkNotNull(tableName, "table name is null");
653    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
654      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
655      new CreateTableProcedureBiConsumer(tableName));
656  }
657
658  @Override
659  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
660    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
661      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
662        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
663      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
664  }
665
666  @Override
667  public CompletableFuture<Void> deleteTable(TableName tableName) {
668    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
669      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
670      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
671      new DeleteTableProcedureBiConsumer(tableName));
672  }
673
674  @Override
675  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
676    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
677      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
678        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
679      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
680  }
681
682  @Override
683  public CompletableFuture<Void> enableTable(TableName tableName) {
684    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
685      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
686      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
687      new EnableTableProcedureBiConsumer(tableName));
688  }
689
690  @Override
691  public CompletableFuture<Void> disableTable(TableName tableName) {
692    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
693      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
694      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
695      new DisableTableProcedureBiConsumer(tableName));
696  }
697
698  /**
699   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
700   * using passed parameters. Sets error or boolean result ('true' if table matches
701   * the passed-in targetState).
702   */
703  private static CompletableFuture<Boolean> completeCheckTableState(
704      CompletableFuture<Boolean> future, TableState tableState, Throwable error,
705      TableState.State targetState, TableName tableName) {
706    if (error != null) {
707      future.completeExceptionally(error);
708    } else {
709      if (tableState != null) {
710        future.complete(tableState.inStates(targetState));
711      } else {
712        future.completeExceptionally(new TableNotFoundException(tableName));
713      }
714    }
715    return future;
716  }
717
718  @Override
719  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
720    if (TableName.isMetaTableName(tableName)) {
721      return CompletableFuture.completedFuture(true);
722    }
723    CompletableFuture<Boolean> future = new CompletableFuture<>();
724    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
725      (tableState, error) -> {
726        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
727          TableState.State.ENABLED, tableName);
728      });
729    return future;
730  }
731
732  @Override
733  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
734    if (TableName.isMetaTableName(tableName)) {
735      return CompletableFuture.completedFuture(false);
736    }
737    CompletableFuture<Boolean> future = new CompletableFuture<>();
738    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
739      (tableState, error) -> {
740        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
741          TableState.State.DISABLED, tableName);
742      });
743    return future;
744  }
745
746  @Override
747  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
748    if (TableName.isMetaTableName(tableName)) {
749      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
750        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
751    }
752    CompletableFuture<Boolean> future = new CompletableFuture<>();
753    addListener(isTableEnabled(tableName), (enabled, error) -> {
754      if (error != null) {
755        if (error instanceof TableNotFoundException) {
756          future.complete(false);
757        } else {
758          future.completeExceptionally(error);
759        }
760        return;
761      }
762      if (!enabled) {
763        future.complete(false);
764      } else {
765        addListener(
766          ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
767          (locations, error1) -> {
768            if (error1 != null) {
769              future.completeExceptionally(error1);
770              return;
771            }
772            List<HRegionLocation> notDeployedRegions = locations.stream()
773              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
774            if (notDeployedRegions.size() > 0) {
775              if (LOG.isDebugEnabled()) {
776                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
777              }
778              future.complete(false);
779              return;
780            }
781            future.complete(true);
782          });
783      }
784    });
785    return future;
786  }
787
788  @Override
789  public CompletableFuture<Void> addColumnFamily(
790      TableName tableName, ColumnFamilyDescriptor columnFamily) {
791    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
792      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
793        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
794      new AddColumnFamilyProcedureBiConsumer(tableName));
795  }
796
797  @Override
798  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
799    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
800      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
801        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
802      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
803  }
804
805  @Override
806  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
807      ColumnFamilyDescriptor columnFamily) {
808    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
809      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
810        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
811      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
812  }
813
814  @Override
815  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
816    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
817      RequestConverter.buildCreateNamespaceRequest(descriptor),
818      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
819      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
820  }
821
822  @Override
823  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
824    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
825      RequestConverter.buildModifyNamespaceRequest(descriptor),
826      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
827      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
828  }
829
830  @Override
831  public CompletableFuture<Void> deleteNamespace(String name) {
832    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
833      RequestConverter.buildDeleteNamespaceRequest(name),
834      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
835      new DeleteNamespaceProcedureBiConsumer(name));
836  }
837
838  @Override
839  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
840    return this
841        .<NamespaceDescriptor> newMasterCaller()
842        .action(
843          (controller, stub) -> this
844              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
845                  call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
846                    (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
847                      -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
848  }
849
850  @Override
851  public CompletableFuture<List<String>> listNamespaces() {
852    return this
853        .<List<String>> newMasterCaller()
854        .action(
855          (controller, stub) -> this
856              .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
857                controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
858                  done) -> s.listNamespaces(c, req, done),
859                (resp) -> resp.getNamespaceNameList())).call();
860  }
861
862  @Override
863  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
864    return this
865        .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
866              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
867                  List<NamespaceDescriptor>> call(controller, stub,
868                  ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
869                      s.listNamespaceDescriptors(c, req, done),
870                    (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
871  }
872
873  @Override
874  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
875    return this.<List<RegionInfo>> newAdminCaller()
876        .action((controller, stub) -> this
877            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
878              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
879              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
880              resp -> ProtobufUtil.getRegionInfos(resp)))
881        .serverName(serverName).call();
882  }
883
884  @Override
885  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
886    if (tableName.equals(META_TABLE_NAME)) {
887      return connection.registry.getMetaRegionLocations()
888        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
889          .collect(Collectors.toList()));
890    } else {
891      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
892        .thenApply(
893          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
894    }
895  }
896
897  @Override
898  public CompletableFuture<Void> flush(TableName tableName) {
899    CompletableFuture<Void> future = new CompletableFuture<>();
900    addListener(tableExists(tableName), (exists, err) -> {
901      if (err != null) {
902        future.completeExceptionally(err);
903      } else if (!exists) {
904        future.completeExceptionally(new TableNotFoundException(tableName));
905      } else {
906        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
907          if (err2 != null) {
908            future.completeExceptionally(err2);
909          } else if (!tableEnabled) {
910            future.completeExceptionally(new TableNotEnabledException(tableName));
911          } else {
912            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
913              new HashMap<>()), (ret, err3) -> {
914                if (err3 != null) {
915                  future.completeExceptionally(err3);
916                } else {
917                  future.complete(ret);
918                }
919              });
920          }
921        });
922      }
923    });
924    return future;
925  }
926
927  @Override
928  public CompletableFuture<Void> flushRegion(byte[] regionName) {
929    return flushRegionInternal(regionName, false).thenAccept(r -> {
930    });
931  }
932
933  /**
934   * This method is for internal use only, where we need the response of the flush.
935   * <p/>
936   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
937   * API.
938   */
939  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
940      boolean writeFlushWALMarker) {
941    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
942    addListener(getRegionLocation(regionName), (location, err) -> {
943      if (err != null) {
944        future.completeExceptionally(err);
945        return;
946      }
947      ServerName serverName = location.getServerName();
948      if (serverName == null) {
949        future
950          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
951        return;
952      }
953      addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
954        if (err2 != null) {
955          future.completeExceptionally(err2);
956        } else {
957          future.complete(ret);
958        }
959      });
960    });
961    return future;
962  }
963
964  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
965      boolean writeFlushWALMarker) {
966    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
967      .action((controller, stub) -> this
968        .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
969          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
970          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
971      .call();
972  }
973
974  @Override
975  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
976    CompletableFuture<Void> future = new CompletableFuture<>();
977    addListener(getRegions(sn), (hRegionInfos, err) -> {
978      if (err != null) {
979        future.completeExceptionally(err);
980        return;
981      }
982      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
983      if (hRegionInfos != null) {
984        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
985        })));
986      }
987      addListener(CompletableFuture.allOf(
988        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
989          if (err2 != null) {
990            future.completeExceptionally(err2);
991          } else {
992            future.complete(ret);
993          }
994        });
995    });
996    return future;
997  }
998
999  @Override
1000  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1001    return compact(tableName, null, false, compactType);
1002  }
1003
1004  @Override
1005  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1006      CompactType compactType) {
1007    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1008        + "If you don't specify a columnFamily, use compact(TableName) instead");
1009    return compact(tableName, columnFamily, false, compactType);
1010  }
1011
1012  @Override
1013  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1014    return compactRegion(regionName, null, false);
1015  }
1016
1017  @Override
1018  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1019    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1020        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1021    return compactRegion(regionName, columnFamily, false);
1022  }
1023
1024  @Override
1025  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1026    return compact(tableName, null, true, compactType);
1027  }
1028
1029  @Override
1030  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1031      CompactType compactType) {
1032    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1033        + "If you don't specify a columnFamily, use compact(TableName) instead");
1034    return compact(tableName, columnFamily, true, compactType);
1035  }
1036
1037  @Override
1038  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1039    return compactRegion(regionName, null, true);
1040  }
1041
1042  @Override
1043  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1044    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1045        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1046    return compactRegion(regionName, columnFamily, true);
1047  }
1048
1049  @Override
1050  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1051    return compactRegionServer(sn, false);
1052  }
1053
1054  @Override
1055  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1056    return compactRegionServer(sn, true);
1057  }
1058
1059  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1060    CompletableFuture<Void> future = new CompletableFuture<>();
1061    addListener(getRegions(sn), (hRegionInfos, err) -> {
1062      if (err != null) {
1063        future.completeExceptionally(err);
1064        return;
1065      }
1066      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1067      if (hRegionInfos != null) {
1068        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1069      }
1070      addListener(CompletableFuture.allOf(
1071        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1072          if (err2 != null) {
1073            future.completeExceptionally(err2);
1074          } else {
1075            future.complete(ret);
1076          }
1077        });
1078    });
1079    return future;
1080  }
1081
1082  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1083      boolean major) {
1084    CompletableFuture<Void> future = new CompletableFuture<>();
1085    addListener(getRegionLocation(regionName), (location, err) -> {
1086      if (err != null) {
1087        future.completeExceptionally(err);
1088        return;
1089      }
1090      ServerName serverName = location.getServerName();
1091      if (serverName == null) {
1092        future
1093          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1094        return;
1095      }
1096      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1097        (ret, err2) -> {
1098          if (err2 != null) {
1099            future.completeExceptionally(err2);
1100          } else {
1101            future.complete(ret);
1102          }
1103        });
1104    });
1105    return future;
1106  }
1107
1108  /**
1109   * List all region locations for the specific table.
1110   */
1111  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1112    if (TableName.META_TABLE_NAME.equals(tableName)) {
1113      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1114      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1115        if (err != null) {
1116          future.completeExceptionally(err);
1117        } else if (metaRegions == null || metaRegions.isEmpty() ||
1118          metaRegions.getDefaultRegionLocation() == null) {
1119          future.completeExceptionally(new IOException("meta region does not found"));
1120        } else {
1121          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1122        }
1123      });
1124      return future;
1125    } else {
1126      // For non-meta table, we fetch all locations by scanning hbase:meta table
1127      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1128    }
1129  }
1130
1131  /**
1132   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1133   */
1134  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1135      CompactType compactType) {
1136    CompletableFuture<Void> future = new CompletableFuture<>();
1137
1138    switch (compactType) {
1139      case MOB:
1140        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1141          if (err != null) {
1142            future.completeExceptionally(err);
1143            return;
1144          }
1145          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1146          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1147            if (err2 != null) {
1148              future.completeExceptionally(err2);
1149            } else {
1150              future.complete(ret);
1151            }
1152          });
1153        });
1154        break;
1155      case NORMAL:
1156        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1157          if (err != null) {
1158            future.completeExceptionally(err);
1159            return;
1160          }
1161          if (locations == null || locations.isEmpty()) {
1162            future.completeExceptionally(new TableNotFoundException(tableName));
1163          }
1164          CompletableFuture<?>[] compactFutures =
1165            locations.stream().filter(l -> l.getRegion() != null)
1166              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1167              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1168              .toArray(CompletableFuture<?>[]::new);
1169          // future complete unless all of the compact futures are completed.
1170          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1171            if (err2 != null) {
1172              future.completeExceptionally(err2);
1173            } else {
1174              future.complete(ret);
1175            }
1176          });
1177        });
1178        break;
1179      default:
1180        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1181    }
1182    return future;
1183  }
1184
1185  /**
1186   * Compact the region at specific region server.
1187   */
1188  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1189      final boolean major, byte[] columnFamily) {
1190    return this
1191        .<Void> newAdminCaller()
1192        .serverName(sn)
1193        .action(
1194          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1195            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1196              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1197            resp -> null)).call();
1198  }
1199
1200  private byte[] toEncodeRegionName(byte[] regionName) {
1201    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1202      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1203  }
1204
1205  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1206      CompletableFuture<TableName> result) {
1207    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1208      if (err != null) {
1209        result.completeExceptionally(err);
1210        return;
1211      }
1212      RegionInfo regionInfo = location.getRegion();
1213      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1214        result.completeExceptionally(
1215          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1216        return;
1217      }
1218      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1219        if (!tableName.get().equals(regionInfo.getTable())) {
1220          // tables of this two region should be same.
1221          result.completeExceptionally(
1222            new IllegalArgumentException("Cannot merge regions from two different tables " +
1223              tableName.get() + " and " + regionInfo.getTable()));
1224        } else {
1225          result.complete(tableName.get());
1226        }
1227      }
1228    });
1229  }
1230
1231  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1232    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1233    CompletableFuture<TableName> future = new CompletableFuture<>();
1234    for (byte[] encodedRegionName : encodedRegionNames) {
1235      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1236    }
1237    return future;
1238  }
1239
1240  @Override
1241  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1242    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1243  }
1244
1245  @Override
1246  public CompletableFuture<Boolean> isMergeEnabled() {
1247    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1248  }
1249
1250  @Override
1251  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1252    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1253  }
1254
1255  @Override
1256  public CompletableFuture<Boolean> isSplitEnabled() {
1257    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1258  }
1259
1260  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1261      MasterSwitchType switchType) {
1262    SetSplitOrMergeEnabledRequest request =
1263      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1264    return this.<Boolean> newMasterCaller()
1265      .action((controller, stub) -> this
1266        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1267          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1268          (resp) -> resp.getPrevValueList().get(0)))
1269      .call();
1270  }
1271
1272  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1273    IsSplitOrMergeEnabledRequest request =
1274        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1275    return this
1276        .<Boolean> newMasterCaller()
1277        .action(
1278          (controller, stub) -> this
1279              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1280                controller, stub, request,
1281                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1282                (resp) -> resp.getEnabled())).call();
1283  }
1284
1285  @Override
1286  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1287    if (nameOfRegionsToMerge.size() < 2) {
1288      return failedFuture(new IllegalArgumentException(
1289        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1290    }
1291    CompletableFuture<Void> future = new CompletableFuture<>();
1292    byte[][] encodedNameOfRegionsToMerge =
1293      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1294
1295    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1296      if (err != null) {
1297        future.completeExceptionally(err);
1298        return;
1299      }
1300
1301      final MergeTableRegionsRequest request;
1302      try {
1303        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1304          forcible, ng.getNonceGroup(), ng.newNonce());
1305      } catch (DeserializationException e) {
1306        future.completeExceptionally(e);
1307        return;
1308      }
1309
1310      addListener(
1311        this.procedureCall(tableName, request,
1312          MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1313          new MergeTableRegionProcedureBiConsumer(tableName)),
1314        (ret, err2) -> {
1315          if (err2 != null) {
1316            future.completeExceptionally(err2);
1317          } else {
1318            future.complete(ret);
1319          }
1320        });
1321    });
1322    return future;
1323  }
1324
1325  @Override
1326  public CompletableFuture<Void> split(TableName tableName) {
1327    CompletableFuture<Void> future = new CompletableFuture<>();
1328    addListener(tableExists(tableName), (exist, error) -> {
1329      if (error != null) {
1330        future.completeExceptionally(error);
1331        return;
1332      }
1333      if (!exist) {
1334        future.completeExceptionally(new TableNotFoundException(tableName));
1335        return;
1336      }
1337      addListener(metaTable
1338        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1339          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1340            ClientMetaTableAccessor.QueryType.REGION))
1341          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1342            ClientMetaTableAccessor.QueryType.REGION))),
1343        (results, err2) -> {
1344          if (err2 != null) {
1345            future.completeExceptionally(err2);
1346            return;
1347          }
1348          if (results != null && !results.isEmpty()) {
1349            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1350            for (Result r : results) {
1351              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1352                continue;
1353              }
1354              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1355              if (rl != null) {
1356                for (HRegionLocation h : rl.getRegionLocations()) {
1357                  if (h != null && h.getServerName() != null) {
1358                    RegionInfo hri = h.getRegion();
1359                    if (hri == null || hri.isSplitParent() ||
1360                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1361                      continue;
1362                    }
1363                    splitFutures.add(split(hri, null));
1364                  }
1365                }
1366              }
1367            }
1368            addListener(
1369              CompletableFuture
1370                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1371              (ret, exception) -> {
1372                if (exception != null) {
1373                  future.completeExceptionally(exception);
1374                  return;
1375                }
1376                future.complete(ret);
1377              });
1378          } else {
1379            future.complete(null);
1380          }
1381        });
1382    });
1383    return future;
1384  }
1385
1386  @Override
1387  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1388    CompletableFuture<Void> result = new CompletableFuture<>();
1389    if (splitPoint == null) {
1390      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1391    }
1392    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1393      (loc, err) -> {
1394        if (err != null) {
1395          result.completeExceptionally(err);
1396        } else if (loc == null || loc.getRegion() == null) {
1397          result.completeExceptionally(new IllegalArgumentException(
1398            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1399        } else {
1400          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1401            if (err2 != null) {
1402              result.completeExceptionally(err2);
1403            } else {
1404              result.complete(ret);
1405            }
1406
1407          });
1408        }
1409      });
1410    return result;
1411  }
1412
1413  @Override
1414  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1415    CompletableFuture<Void> future = new CompletableFuture<>();
1416    addListener(getRegionLocation(regionName), (location, err) -> {
1417      if (err != null) {
1418        future.completeExceptionally(err);
1419        return;
1420      }
1421      RegionInfo regionInfo = location.getRegion();
1422      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1423        future
1424          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1425            "Replicas are auto-split when their primary is split."));
1426        return;
1427      }
1428      ServerName serverName = location.getServerName();
1429      if (serverName == null) {
1430        future
1431          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1432        return;
1433      }
1434      addListener(split(regionInfo, null), (ret, err2) -> {
1435        if (err2 != null) {
1436          future.completeExceptionally(err2);
1437        } else {
1438          future.complete(ret);
1439        }
1440      });
1441    });
1442    return future;
1443  }
1444
1445  @Override
1446  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1447    Preconditions.checkNotNull(splitPoint,
1448      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1449    CompletableFuture<Void> future = new CompletableFuture<>();
1450    addListener(getRegionLocation(regionName), (location, err) -> {
1451      if (err != null) {
1452        future.completeExceptionally(err);
1453        return;
1454      }
1455      RegionInfo regionInfo = location.getRegion();
1456      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1457        future
1458          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1459            "Replicas are auto-split when their primary is split."));
1460        return;
1461      }
1462      ServerName serverName = location.getServerName();
1463      if (serverName == null) {
1464        future
1465          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1466        return;
1467      }
1468      if (regionInfo.getStartKey() != null &&
1469        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1470        future.completeExceptionally(
1471          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1472        return;
1473      }
1474      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1475        if (err2 != null) {
1476          future.completeExceptionally(err2);
1477        } else {
1478          future.complete(ret);
1479        }
1480      });
1481    });
1482    return future;
1483  }
1484
1485  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1486    CompletableFuture<Void> future = new CompletableFuture<>();
1487    TableName tableName = hri.getTable();
1488    final SplitTableRegionRequest request;
1489    try {
1490      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1491        ng.newNonce());
1492    } catch (DeserializationException e) {
1493      future.completeExceptionally(e);
1494      return future;
1495    }
1496
1497    addListener(
1498      this.procedureCall(tableName,
1499        request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1500        new SplitTableRegionProcedureBiConsumer(tableName)),
1501      (ret, err2) -> {
1502        if (err2 != null) {
1503          future.completeExceptionally(err2);
1504        } else {
1505          future.complete(ret);
1506        }
1507      });
1508    return future;
1509  }
1510
1511  @Override
1512  public CompletableFuture<Void> assign(byte[] regionName) {
1513    CompletableFuture<Void> future = new CompletableFuture<>();
1514    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1515      if (err != null) {
1516        future.completeExceptionally(err);
1517        return;
1518      }
1519      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1520        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1521          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1522          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1523        .call(), (ret, err2) -> {
1524          if (err2 != null) {
1525            future.completeExceptionally(err2);
1526          } else {
1527            future.complete(ret);
1528          }
1529        });
1530    });
1531    return future;
1532  }
1533
1534  @Override
1535  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1536    CompletableFuture<Void> future = new CompletableFuture<>();
1537    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1538      if (err != null) {
1539        future.completeExceptionally(err);
1540        return;
1541      }
1542      addListener(
1543        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1544          .action(((controller, stub) -> this
1545            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1546              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1547              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1548          .call(),
1549        (ret, err2) -> {
1550          if (err2 != null) {
1551            future.completeExceptionally(err2);
1552          } else {
1553            future.complete(ret);
1554          }
1555        });
1556    });
1557    return future;
1558  }
1559
1560  @Override
1561  public CompletableFuture<Void> offline(byte[] regionName) {
1562    CompletableFuture<Void> future = new CompletableFuture<>();
1563    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1564      if (err != null) {
1565        future.completeExceptionally(err);
1566        return;
1567      }
1568      addListener(
1569        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1570          .action(((controller, stub) -> this
1571            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1572              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1573              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1574          .call(),
1575        (ret, err2) -> {
1576          if (err2 != null) {
1577            future.completeExceptionally(err2);
1578          } else {
1579            future.complete(ret);
1580          }
1581        });
1582    });
1583    return future;
1584  }
1585
1586  @Override
1587  public CompletableFuture<Void> move(byte[] regionName) {
1588    CompletableFuture<Void> future = new CompletableFuture<>();
1589    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1590      if (err != null) {
1591        future.completeExceptionally(err);
1592        return;
1593      }
1594      addListener(
1595        moveRegion(regionInfo,
1596          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1597        (ret, err2) -> {
1598          if (err2 != null) {
1599            future.completeExceptionally(err2);
1600          } else {
1601            future.complete(ret);
1602          }
1603        });
1604    });
1605    return future;
1606  }
1607
1608  @Override
1609  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1610    Preconditions.checkNotNull(destServerName,
1611      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1612    CompletableFuture<Void> future = new CompletableFuture<>();
1613    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1614      if (err != null) {
1615        future.completeExceptionally(err);
1616        return;
1617      }
1618      addListener(
1619        moveRegion(regionInfo, RequestConverter
1620          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1621        (ret, err2) -> {
1622          if (err2 != null) {
1623            future.completeExceptionally(err2);
1624          } else {
1625            future.complete(ret);
1626          }
1627        });
1628    });
1629    return future;
1630  }
1631
1632  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1633    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1634      .action(
1635        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1636          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1637      .call();
1638  }
1639
1640  @Override
1641  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1642    return this
1643        .<Void> newMasterCaller()
1644        .action(
1645          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1646            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1647            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1648  }
1649
1650  @Override
1651  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1652    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1653    Scan scan = QuotaTableUtil.makeScan(filter);
1654    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1655        .scan(scan, new AdvancedScanResultConsumer() {
1656          List<QuotaSettings> settings = new ArrayList<>();
1657
1658          @Override
1659          public void onNext(Result[] results, ScanController controller) {
1660            for (Result result : results) {
1661              try {
1662                QuotaTableUtil.parseResultToCollection(result, settings);
1663              } catch (IOException e) {
1664                controller.terminate();
1665                future.completeExceptionally(e);
1666              }
1667            }
1668          }
1669
1670          @Override
1671          public void onError(Throwable error) {
1672            future.completeExceptionally(error);
1673          }
1674
1675          @Override
1676          public void onComplete() {
1677            future.complete(settings);
1678          }
1679        });
1680    return future;
1681  }
1682
1683  @Override
1684  public CompletableFuture<Void> addReplicationPeer(String peerId,
1685      ReplicationPeerConfig peerConfig, boolean enabled) {
1686    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1687      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1688      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1689      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1690  }
1691
1692  @Override
1693  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1694    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1695      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1696      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1697      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1698  }
1699
1700  @Override
1701  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1702    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1703      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1704      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1705      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1706  }
1707
1708  @Override
1709  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1710    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1711      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1712      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1713      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1714  }
1715
1716  @Override
1717  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1718    return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1719      .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
1720          call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1721            (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1722            (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1723  }
1724
1725  @Override
1726  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1727      ReplicationPeerConfig peerConfig) {
1728    return this
1729      .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1730        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1731        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1732        (resp) -> resp.getProcId(),
1733        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1734  }
1735
1736  @Override
1737  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1738      SyncReplicationState clusterState) {
1739    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1740        TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1741        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1742          clusterState),
1743          (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1744          (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1745            () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1746  }
1747
1748  @Override
1749  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1750      Map<TableName, List<String>> tableCfs) {
1751    if (tableCfs == null) {
1752      return failedFuture(new ReplicationException("tableCfs is null"));
1753    }
1754
1755    CompletableFuture<Void> future = new CompletableFuture<Void>();
1756    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1757      if (!completeExceptionally(future, error)) {
1758        ReplicationPeerConfig newPeerConfig =
1759          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1760        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1761          if (!completeExceptionally(future, error)) {
1762            future.complete(result);
1763          }
1764        });
1765      }
1766    });
1767    return future;
1768  }
1769
1770  @Override
1771  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1772      Map<TableName, List<String>> tableCfs) {
1773    if (tableCfs == null) {
1774      return failedFuture(new ReplicationException("tableCfs is null"));
1775    }
1776
1777    CompletableFuture<Void> future = new CompletableFuture<Void>();
1778    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1779      if (!completeExceptionally(future, error)) {
1780        ReplicationPeerConfig newPeerConfig = null;
1781        try {
1782          newPeerConfig = ReplicationPeerConfigUtil
1783            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1784        } catch (ReplicationException e) {
1785          future.completeExceptionally(e);
1786          return;
1787        }
1788        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1789          if (!completeExceptionally(future, error)) {
1790            future.complete(result);
1791          }
1792        });
1793      }
1794    });
1795    return future;
1796  }
1797
1798  @Override
1799  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1800    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1801  }
1802
1803  @Override
1804  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1805    Preconditions.checkNotNull(pattern,
1806      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1807    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1808  }
1809
1810  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1811      ListReplicationPeersRequest request) {
1812    return this
1813        .<List<ReplicationPeerDescription>> newMasterCaller()
1814        .action(
1815          (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1816              List<ReplicationPeerDescription>> call(controller, stub, request,
1817                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1818                (resp) -> resp.getPeerDescList().stream()
1819                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1820                    .collect(Collectors.toList()))).call();
1821  }
1822
1823  @Override
1824  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1825    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1826    addListener(listTableDescriptors(), (tables, error) -> {
1827      if (!completeExceptionally(future, error)) {
1828        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1829        tables.forEach(table -> {
1830          Map<String, Integer> cfs = new HashMap<>();
1831          Stream.of(table.getColumnFamilies())
1832            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1833            .forEach(column -> {
1834              cfs.put(column.getNameAsString(), column.getScope());
1835            });
1836          if (!cfs.isEmpty()) {
1837            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1838          }
1839        });
1840        future.complete(replicatedTableCFs);
1841      }
1842    });
1843    return future;
1844  }
1845
1846  @Override
1847  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1848    SnapshotProtos.SnapshotDescription snapshot =
1849      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1850    try {
1851      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1852    } catch (IllegalArgumentException e) {
1853      return failedFuture(e);
1854    }
1855    CompletableFuture<Void> future = new CompletableFuture<>();
1856    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1857    addListener(this.<Long> newMasterCaller()
1858      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1859        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1860        resp -> resp.getExpectedTimeout()))
1861      .call(), (expectedTimeout, err) -> {
1862        if (err != null) {
1863          future.completeExceptionally(err);
1864          return;
1865        }
1866        TimerTask pollingTask = new TimerTask() {
1867          int tries = 0;
1868          long startTime = EnvironmentEdgeManager.currentTime();
1869          long endTime = startTime + expectedTimeout;
1870          long maxPauseTime = expectedTimeout / maxAttempts;
1871
1872          @Override
1873          public void run(Timeout timeout) throws Exception {
1874            if (EnvironmentEdgeManager.currentTime() < endTime) {
1875              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1876                if (err2 != null) {
1877                  future.completeExceptionally(err2);
1878                } else if (done) {
1879                  future.complete(null);
1880                } else {
1881                  // retry again after pauseTime.
1882                  long pauseTime =
1883                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1884                  pauseTime = Math.min(pauseTime, maxPauseTime);
1885                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1886                    TimeUnit.MILLISECONDS);
1887                }
1888              });
1889            } else {
1890              future.completeExceptionally(
1891                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1892                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1893            }
1894          }
1895        };
1896        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1897      });
1898    return future;
1899  }
1900
1901  @Override
1902  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1903    return this
1904        .<Boolean> newMasterCaller()
1905        .action(
1906          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1907            controller,
1908            stub,
1909            IsSnapshotDoneRequest.newBuilder()
1910                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1911                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1912  }
1913
1914  @Override
1915  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1916    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1917      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1918      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1919    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1920  }
1921
1922  @Override
1923  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1924      boolean restoreAcl) {
1925    CompletableFuture<Void> future = new CompletableFuture<>();
1926    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1927      if (err != null) {
1928        future.completeExceptionally(err);
1929        return;
1930      }
1931      TableName tableName = null;
1932      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1933        for (SnapshotDescription snap : snapshotDescriptions) {
1934          if (snap.getName().equals(snapshotName)) {
1935            tableName = snap.getTableName();
1936            break;
1937          }
1938        }
1939      }
1940      if (tableName == null) {
1941        future.completeExceptionally(new RestoreSnapshotException(
1942          "Unable to find the table name for snapshot=" + snapshotName));
1943        return;
1944      }
1945      final TableName finalTableName = tableName;
1946      addListener(tableExists(finalTableName), (exists, err2) -> {
1947        if (err2 != null) {
1948          future.completeExceptionally(err2);
1949        } else if (!exists) {
1950          // if table does not exist, then just clone snapshot into new table.
1951          completeConditionalOnFuture(future,
1952            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1953        } else {
1954          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1955            if (err4 != null) {
1956              future.completeExceptionally(err4);
1957            } else if (!disabled) {
1958              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1959            } else {
1960              completeConditionalOnFuture(future,
1961                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1962            }
1963          });
1964        }
1965      });
1966    });
1967    return future;
1968  }
1969
1970  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1971      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1972    if (takeFailSafeSnapshot) {
1973      CompletableFuture<Void> future = new CompletableFuture<>();
1974      // Step.1 Take a snapshot of the current state
1975      String failSafeSnapshotSnapshotNameFormat =
1976        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1977          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1978      final String failSafeSnapshotSnapshotName =
1979        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1980          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1981          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1982      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1983      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1984        if (err != null) {
1985          future.completeExceptionally(err);
1986        } else {
1987          // Step.2 Restore snapshot
1988          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1989            (void2, err2) -> {
1990              if (err2 != null) {
1991                // Step.3.a Something went wrong during the restore and try to rollback.
1992                addListener(
1993                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1994                  (void3, err3) -> {
1995                    if (err3 != null) {
1996                      future.completeExceptionally(err3);
1997                    } else {
1998                      String msg =
1999                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
2000                          failSafeSnapshotSnapshotName + " succeeded.";
2001                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2002                    }
2003                  });
2004              } else {
2005                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2006                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2007                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2008                  if (err3 != null) {
2009                    LOG.error(
2010                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2011                      err3);
2012                  }
2013                  future.complete(ret3);
2014                });
2015              }
2016            });
2017        }
2018      });
2019      return future;
2020    } else {
2021      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
2022    }
2023  }
2024
2025  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2026      CompletableFuture<T> parentFuture) {
2027    addListener(parentFuture, (res, err) -> {
2028      if (err != null) {
2029        dependentFuture.completeExceptionally(err);
2030      } else {
2031        dependentFuture.complete(res);
2032      }
2033    });
2034  }
2035
2036  @Override
2037  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2038      boolean restoreAcl) {
2039    CompletableFuture<Void> future = new CompletableFuture<>();
2040    addListener(tableExists(tableName), (exists, err) -> {
2041      if (err != null) {
2042        future.completeExceptionally(err);
2043      } else if (exists) {
2044        future.completeExceptionally(new TableExistsException(tableName));
2045      } else {
2046        completeConditionalOnFuture(future,
2047          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2048      }
2049    });
2050    return future;
2051  }
2052
2053  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2054      boolean restoreAcl) {
2055    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2056      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2057    try {
2058      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2059    } catch (IllegalArgumentException e) {
2060      return failedFuture(e);
2061    }
2062    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2063      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2064        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2065          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2066        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2067      .call());
2068  }
2069
2070  @Override
2071  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2072    return getCompletedSnapshots(null);
2073  }
2074
2075  @Override
2076  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2077    Preconditions.checkNotNull(pattern,
2078      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2079    return getCompletedSnapshots(pattern);
2080  }
2081
2082  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2083    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2084        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2085        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2086          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2087          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2088        .call();
2089  }
2090
2091  @Override
2092  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2093    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2094        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2095    return getCompletedSnapshots(tableNamePattern, null);
2096  }
2097
2098  @Override
2099  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2100      Pattern snapshotNamePattern) {
2101    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2102        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2103    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2104        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2105    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2106  }
2107
2108  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2109      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2110    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2111    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2112      if (err != null) {
2113        future.completeExceptionally(err);
2114        return;
2115      }
2116      if (tableNames == null || tableNames.size() <= 0) {
2117        future.complete(Collections.emptyList());
2118        return;
2119      }
2120      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2121        if (err2 != null) {
2122          future.completeExceptionally(err2);
2123          return;
2124        }
2125        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2126          future.complete(Collections.emptyList());
2127          return;
2128        }
2129        future.complete(snapshotDescList.stream()
2130          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2131          .collect(Collectors.toList()));
2132      });
2133    });
2134    return future;
2135  }
2136
2137  @Override
2138  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2139    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2140  }
2141
2142  @Override
2143  public CompletableFuture<Void> deleteSnapshots() {
2144    return internalDeleteSnapshots(null, null);
2145  }
2146
2147  @Override
2148  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2149    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2150        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2151    return internalDeleteSnapshots(null, snapshotNamePattern);
2152  }
2153
2154  @Override
2155  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2156    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2157        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2158    return internalDeleteSnapshots(tableNamePattern, null);
2159  }
2160
2161  @Override
2162  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2163      Pattern snapshotNamePattern) {
2164    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2165        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2166    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2167        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2168    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2169  }
2170
2171  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2172      Pattern snapshotNamePattern) {
2173    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2174    if (tableNamePattern == null) {
2175      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2176    } else {
2177      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2178    }
2179    CompletableFuture<Void> future = new CompletableFuture<>();
2180    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2181      if (err != null) {
2182        future.completeExceptionally(err);
2183        return;
2184      }
2185      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2186        future.complete(null);
2187        return;
2188      }
2189      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2190        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2191          if (e != null) {
2192            future.completeExceptionally(e);
2193          } else {
2194            future.complete(v);
2195          }
2196        });
2197    }));
2198    return future;
2199  }
2200
2201  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2202    return this
2203        .<Void> newMasterCaller()
2204        .action(
2205          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2206            controller,
2207            stub,
2208            DeleteSnapshotRequest.newBuilder()
2209                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2210                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2211  }
2212
2213  @Override
2214  public CompletableFuture<Void> execProcedure(String signature, String instance,
2215      Map<String, String> props) {
2216    CompletableFuture<Void> future = new CompletableFuture<>();
2217    ProcedureDescription procDesc =
2218      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2219    addListener(this.<Long> newMasterCaller()
2220      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2221        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2222        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2223      .call(), (expectedTimeout, err) -> {
2224        if (err != null) {
2225          future.completeExceptionally(err);
2226          return;
2227        }
2228        TimerTask pollingTask = new TimerTask() {
2229          int tries = 0;
2230          long startTime = EnvironmentEdgeManager.currentTime();
2231          long endTime = startTime + expectedTimeout;
2232          long maxPauseTime = expectedTimeout / maxAttempts;
2233
2234          @Override
2235          public void run(Timeout timeout) throws Exception {
2236            if (EnvironmentEdgeManager.currentTime() < endTime) {
2237              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2238                if (err2 != null) {
2239                  future.completeExceptionally(err2);
2240                  return;
2241                }
2242                if (done) {
2243                  future.complete(null);
2244                } else {
2245                  // retry again after pauseTime.
2246                  long pauseTime =
2247                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2248                  pauseTime = Math.min(pauseTime, maxPauseTime);
2249                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2250                    TimeUnit.MICROSECONDS);
2251                }
2252              });
2253            } else {
2254              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2255                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2256            }
2257          }
2258        };
2259        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2260        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2261      });
2262    return future;
2263  }
2264
2265  @Override
2266  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2267      Map<String, String> props) {
2268    ProcedureDescription proDesc =
2269        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2270    return this.<byte[]> newMasterCaller()
2271        .action(
2272          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2273            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2274            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2275            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2276        .call();
2277  }
2278
2279  @Override
2280  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2281      Map<String, String> props) {
2282    ProcedureDescription proDesc =
2283        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2284    return this.<Boolean> newMasterCaller()
2285        .action((controller, stub) -> this
2286            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2287              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2288              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2289        .call();
2290  }
2291
2292  @Override
2293  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2294    return this.<Boolean> newMasterCaller().action(
2295      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2296        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2297        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2298        .call();
2299  }
2300
2301  @Override
2302  public CompletableFuture<String> getProcedures() {
2303    return this
2304        .<String> newMasterCaller()
2305        .action(
2306          (controller, stub) -> this
2307              .<GetProceduresRequest, GetProceduresResponse, String> call(
2308                controller, stub, GetProceduresRequest.newBuilder().build(),
2309                (s, c, req, done) -> s.getProcedures(c, req, done),
2310                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2311  }
2312
2313  @Override
2314  public CompletableFuture<String> getLocks() {
2315    return this
2316        .<String> newMasterCaller()
2317        .action(
2318          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2319            controller, stub, GetLocksRequest.newBuilder().build(),
2320            (s, c, req, done) -> s.getLocks(c, req, done),
2321            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2322  }
2323
2324  @Override
2325  public CompletableFuture<Void> decommissionRegionServers(
2326      List<ServerName> servers, boolean offload) {
2327    return this.<Void> newMasterCaller()
2328        .action((controller, stub) -> this
2329          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2330            controller, stub,
2331              RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2332            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2333        .call();
2334  }
2335
2336  @Override
2337  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2338    return this.<List<ServerName>> newMasterCaller()
2339        .action((controller, stub) -> this
2340          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2341            List<ServerName>> call(
2342              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2343              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2344              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2345                  .collect(Collectors.toList())))
2346        .call();
2347  }
2348
2349  @Override
2350  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2351      List<byte[]> encodedRegionNames) {
2352    return this.<Void> newMasterCaller()
2353        .action((controller, stub) ->
2354            this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2355                controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2356                    server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2357                        c, req, done), resp -> null)).call();
2358  }
2359
2360  /**
2361   * Get the region location for the passed region name. The region name may be a full region name
2362   * or encoded region name. If the region does not found, then it'll throw an
2363   * UnknownRegionException wrapped by a {@link CompletableFuture}
2364   * @param regionNameOrEncodedRegionName region name or encoded region name
2365   * @return region location, wrapped by a {@link CompletableFuture}
2366   */
2367  @VisibleForTesting
2368  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2369    if (regionNameOrEncodedRegionName == null) {
2370      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2371    }
2372    try {
2373      CompletableFuture<Optional<HRegionLocation>> future;
2374      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2375        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2376        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2377          // old format encodedName, should be meta region
2378          future = connection.registry.getMetaRegionLocations()
2379            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2380              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2381        } else {
2382          future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2383            regionNameOrEncodedRegionName);
2384        }
2385      } else {
2386        RegionInfo regionInfo =
2387          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2388        if (regionInfo.isMetaRegion()) {
2389          future = connection.registry.getMetaRegionLocations()
2390            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2391              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2392              .findFirst());
2393        } else {
2394          future =
2395            ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2396        }
2397      }
2398
2399      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2400      addListener(future, (location, err) -> {
2401        if (err != null) {
2402          returnedFuture.completeExceptionally(err);
2403          return;
2404        }
2405        if (!location.isPresent() || location.get().getRegion() == null) {
2406          returnedFuture.completeExceptionally(
2407            new UnknownRegionException("Invalid region name or encoded region name: " +
2408              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2409        } else {
2410          returnedFuture.complete(location.get());
2411        }
2412      });
2413      return returnedFuture;
2414    } catch (IOException e) {
2415      return failedFuture(e);
2416    }
2417  }
2418
2419  /**
2420   * Get the region info for the passed region name. The region name may be a full region name or
2421   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2422   * wrapped by a {@link CompletableFuture}
2423   * @return region info, wrapped by a {@link CompletableFuture}
2424   */
2425  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2426    if (regionNameOrEncodedRegionName == null) {
2427      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2428    }
2429
2430    if (Bytes.equals(regionNameOrEncodedRegionName,
2431      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2432      Bytes.equals(regionNameOrEncodedRegionName,
2433        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2434      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2435    }
2436
2437    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2438    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2439      if (err != null) {
2440        future.completeExceptionally(err);
2441      } else {
2442        future.complete(location.getRegion());
2443      }
2444    });
2445    return future;
2446  }
2447
2448  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2449    if (numRegions < 3) {
2450      throw new IllegalArgumentException("Must create at least three regions");
2451    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2452      throw new IllegalArgumentException("Start key must be smaller than end key");
2453    }
2454    if (numRegions == 3) {
2455      return new byte[][] { startKey, endKey };
2456    }
2457    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2458    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2459      throw new IllegalArgumentException("Unable to split key range into enough regions");
2460    }
2461    return splitKeys;
2462  }
2463
2464  private void verifySplitKeys(byte[][] splitKeys) {
2465    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2466    // Verify there are no duplicate split keys
2467    byte[] lastKey = null;
2468    for (byte[] splitKey : splitKeys) {
2469      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2470        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2471      }
2472      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2473        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2474            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2475      }
2476      lastKey = splitKey;
2477    }
2478  }
2479
2480  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2481
2482    abstract void onFinished();
2483
2484    abstract void onError(Throwable error);
2485
2486    @Override
2487    public void accept(Void v, Throwable error) {
2488      if (error != null) {
2489        onError(error);
2490        return;
2491      }
2492      onFinished();
2493    }
2494  }
2495
2496  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2497    protected final TableName tableName;
2498
2499    TableProcedureBiConsumer(TableName tableName) {
2500      this.tableName = tableName;
2501    }
2502
2503    abstract String getOperationType();
2504
2505    String getDescription() {
2506      return "Operation: " + getOperationType() + ", " + "Table Name: "
2507          + tableName.getNameWithNamespaceInclAsString();
2508    }
2509
2510    @Override
2511    void onFinished() {
2512      LOG.info(getDescription() + " completed");
2513    }
2514
2515    @Override
2516    void onError(Throwable error) {
2517      LOG.info(getDescription() + " failed with " + error.getMessage());
2518    }
2519  }
2520
2521  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2522    protected final String namespaceName;
2523
2524    NamespaceProcedureBiConsumer(String namespaceName) {
2525      this.namespaceName = namespaceName;
2526    }
2527
2528    abstract String getOperationType();
2529
2530    String getDescription() {
2531      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2532    }
2533
2534    @Override
2535    void onFinished() {
2536      LOG.info(getDescription() + " completed");
2537    }
2538
2539    @Override
2540    void onError(Throwable error) {
2541      LOG.info(getDescription() + " failed with " + error.getMessage());
2542    }
2543  }
2544
2545  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2546
2547    CreateTableProcedureBiConsumer(TableName tableName) {
2548      super(tableName);
2549    }
2550
2551    @Override
2552    String getOperationType() {
2553      return "CREATE";
2554    }
2555  }
2556
2557  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2558
2559    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2560      super(tableName);
2561    }
2562
2563    @Override
2564    String getOperationType() {
2565      return "ENABLE";
2566    }
2567  }
2568
2569  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2570
2571    DeleteTableProcedureBiConsumer(TableName tableName) {
2572      super(tableName);
2573    }
2574
2575    @Override
2576    String getOperationType() {
2577      return "DELETE";
2578    }
2579
2580    @Override
2581    void onFinished() {
2582      connection.getLocator().clearCache(this.tableName);
2583      super.onFinished();
2584    }
2585  }
2586
2587  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2588
2589    TruncateTableProcedureBiConsumer(TableName tableName) {
2590      super(tableName);
2591    }
2592
2593    @Override
2594    String getOperationType() {
2595      return "TRUNCATE";
2596    }
2597  }
2598
2599  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2600
2601    EnableTableProcedureBiConsumer(TableName tableName) {
2602      super(tableName);
2603    }
2604
2605    @Override
2606    String getOperationType() {
2607      return "ENABLE";
2608    }
2609  }
2610
2611  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2612
2613    DisableTableProcedureBiConsumer(TableName tableName) {
2614      super(tableName);
2615    }
2616
2617    @Override
2618    String getOperationType() {
2619      return "DISABLE";
2620    }
2621  }
2622
2623  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2624
2625    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2626      super(tableName);
2627    }
2628
2629    @Override
2630    String getOperationType() {
2631      return "ADD_COLUMN_FAMILY";
2632    }
2633  }
2634
2635  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2636
2637    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2638      super(tableName);
2639    }
2640
2641    @Override
2642    String getOperationType() {
2643      return "DELETE_COLUMN_FAMILY";
2644    }
2645  }
2646
2647  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2648
2649    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2650      super(tableName);
2651    }
2652
2653    @Override
2654    String getOperationType() {
2655      return "MODIFY_COLUMN_FAMILY";
2656    }
2657  }
2658
2659  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2660
2661    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2662      super(namespaceName);
2663    }
2664
2665    @Override
2666    String getOperationType() {
2667      return "CREATE_NAMESPACE";
2668    }
2669  }
2670
2671  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2672
2673    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2674      super(namespaceName);
2675    }
2676
2677    @Override
2678    String getOperationType() {
2679      return "DELETE_NAMESPACE";
2680    }
2681  }
2682
2683  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2684
2685    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2686      super(namespaceName);
2687    }
2688
2689    @Override
2690    String getOperationType() {
2691      return "MODIFY_NAMESPACE";
2692    }
2693  }
2694
2695  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2696
2697    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2698      super(tableName);
2699    }
2700
2701    @Override
2702    String getOperationType() {
2703      return "MERGE_REGIONS";
2704    }
2705  }
2706
2707  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2708
2709    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2710      super(tableName);
2711    }
2712
2713    @Override
2714    String getOperationType() {
2715      return "SPLIT_REGION";
2716    }
2717  }
2718
2719  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2720    private final String peerId;
2721    private final Supplier<String> getOperation;
2722
2723    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2724      this.peerId = peerId;
2725      this.getOperation = getOperation;
2726    }
2727
2728    String getDescription() {
2729      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2730    }
2731
2732    @Override
2733    void onFinished() {
2734      LOG.info(getDescription() + " completed");
2735    }
2736
2737    @Override
2738    void onError(Throwable error) {
2739      LOG.info(getDescription() + " failed with " + error.getMessage());
2740    }
2741  }
2742
2743  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2744    CompletableFuture<Void> future = new CompletableFuture<>();
2745    addListener(procFuture, (procId, error) -> {
2746      if (error != null) {
2747        future.completeExceptionally(error);
2748        return;
2749      }
2750      getProcedureResult(procId, future, 0);
2751    });
2752    return future;
2753  }
2754
2755  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2756    addListener(
2757      this.<GetProcedureResultResponse> newMasterCaller()
2758        .action((controller, stub) -> this
2759          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2760            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2761            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2762        .call(),
2763      (response, error) -> {
2764        if (error != null) {
2765          LOG.warn("failed to get the procedure result procId={}", procId,
2766            ConnectionUtils.translateException(error));
2767          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2768            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2769          return;
2770        }
2771        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2772          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2773            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2774          return;
2775        }
2776        if (response.hasException()) {
2777          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2778          future.completeExceptionally(ioe);
2779        } else {
2780          future.complete(null);
2781        }
2782      });
2783  }
2784
2785  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2786    CompletableFuture<T> future = new CompletableFuture<>();
2787    future.completeExceptionally(error);
2788    return future;
2789  }
2790
2791  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2792    if (error != null) {
2793      future.completeExceptionally(error);
2794      return true;
2795    }
2796    return false;
2797  }
2798
2799  @Override
2800  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2801    return getClusterMetrics(EnumSet.allOf(Option.class));
2802  }
2803
2804  @Override
2805  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2806    return this
2807        .<ClusterMetrics> newMasterCaller()
2808        .action(
2809          (controller, stub) -> this
2810              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2811                stub, RequestConverter.buildGetClusterStatusRequest(options),
2812                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2813                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2814  }
2815
2816  @Override
2817  public CompletableFuture<Void> shutdown() {
2818    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2819      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2820        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2821        resp -> null))
2822      .call();
2823  }
2824
2825  @Override
2826  public CompletableFuture<Void> stopMaster() {
2827    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2828      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2829        controller, stub, StopMasterRequest.newBuilder().build(),
2830        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2831      .call();
2832  }
2833
2834  @Override
2835  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2836    StopServerRequest request = RequestConverter
2837      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2838    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2839      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2840        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2841        resp -> null))
2842      .serverName(serverName).call();
2843  }
2844
2845  @Override
2846  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2847    return this
2848        .<Void> newAdminCaller()
2849        .action(
2850          (controller, stub) -> this
2851              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2852                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2853                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2854        .serverName(serverName).call();
2855  }
2856
2857  @Override
2858  public CompletableFuture<Void> updateConfiguration() {
2859    CompletableFuture<Void> future = new CompletableFuture<Void>();
2860    addListener(
2861      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2862      (status, err) -> {
2863        if (err != null) {
2864          future.completeExceptionally(err);
2865        } else {
2866          List<CompletableFuture<Void>> futures = new ArrayList<>();
2867          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2868          futures.add(updateConfiguration(status.getMasterName()));
2869          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2870          addListener(
2871            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2872            (result, err2) -> {
2873              if (err2 != null) {
2874                future.completeExceptionally(err2);
2875              } else {
2876                future.complete(result);
2877              }
2878            });
2879        }
2880      });
2881    return future;
2882  }
2883
2884  @Override
2885  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2886    return this
2887        .<Void> newAdminCaller()
2888        .action(
2889          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2890            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2891            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2892        .serverName(serverName).call();
2893  }
2894
2895  @Override
2896  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2897    return this
2898        .<Void> newAdminCaller()
2899        .action(
2900          (controller, stub) -> this
2901              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2902                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2903                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2904        .serverName(serverName).call();
2905  }
2906
2907  @Override
2908  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2909    return this
2910        .<List<SecurityCapability>> newMasterCaller()
2911        .action(
2912          (controller, stub) -> this
2913              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
2914                  call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
2915                    (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
2916                    (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
2917        .call();
2918  }
2919
2920  @Override
2921  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2922    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2923  }
2924
2925  @Override
2926  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2927      TableName tableName) {
2928    Preconditions.checkNotNull(tableName,
2929      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2930    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2931  }
2932
2933  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2934      ServerName serverName) {
2935    return this.<List<RegionMetrics>> newAdminCaller()
2936        .action((controller, stub) -> this
2937            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2938              adminCall(controller, stub, request, (s, c, req, done) ->
2939                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2940        .serverName(serverName).call();
2941  }
2942
2943  @Override
2944  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2945    return this
2946        .<Boolean> newMasterCaller()
2947        .action(
2948          (controller, stub) -> this
2949              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2950                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2951                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2952                resp -> resp.getInMaintenanceMode())).call();
2953  }
2954
2955  @Override
2956  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2957      CompactType compactType) {
2958    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2959
2960    switch (compactType) {
2961      case MOB:
2962        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
2963          if (err != null) {
2964            future.completeExceptionally(err);
2965            return;
2966          }
2967          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2968
2969          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2970            .action((controller, stub) -> this
2971              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2972                controller, stub,
2973                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2974                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2975            .call(), (resp2, err2) -> {
2976              if (err2 != null) {
2977                future.completeExceptionally(err2);
2978              } else {
2979                if (resp2.hasCompactionState()) {
2980                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2981                } else {
2982                  future.complete(CompactionState.NONE);
2983                }
2984              }
2985            });
2986        });
2987        break;
2988      case NORMAL:
2989        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2990          if (err != null) {
2991            future.completeExceptionally(err);
2992            return;
2993          }
2994          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2995          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2996          locations.stream().filter(loc -> loc.getServerName() != null)
2997            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2998            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2999              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3000                // If any region compaction state is MAJOR_AND_MINOR
3001                // the table compaction state is MAJOR_AND_MINOR, too.
3002                if (err2 != null) {
3003                  future.completeExceptionally(unwrapCompletionException(err2));
3004                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3005                  future.complete(regionState);
3006                } else {
3007                  regionStates.add(regionState);
3008                }
3009              }));
3010            });
3011          addListener(
3012            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3013            (ret, err3) -> {
3014              // If future not completed, check all regions's compaction state
3015              if (!future.isCompletedExceptionally() && !future.isDone()) {
3016                CompactionState state = CompactionState.NONE;
3017                for (CompactionState regionState : regionStates) {
3018                  switch (regionState) {
3019                    case MAJOR:
3020                      if (state == CompactionState.MINOR) {
3021                        future.complete(CompactionState.MAJOR_AND_MINOR);
3022                      } else {
3023                        state = CompactionState.MAJOR;
3024                      }
3025                      break;
3026                    case MINOR:
3027                      if (state == CompactionState.MAJOR) {
3028                        future.complete(CompactionState.MAJOR_AND_MINOR);
3029                      } else {
3030                        state = CompactionState.MINOR;
3031                      }
3032                      break;
3033                    case NONE:
3034                    default:
3035                  }
3036                }
3037                if (!future.isDone()) {
3038                  future.complete(state);
3039                }
3040              }
3041            });
3042        });
3043        break;
3044      default:
3045        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3046    }
3047
3048    return future;
3049  }
3050
3051  @Override
3052  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3053    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3054    addListener(getRegionLocation(regionName), (location, err) -> {
3055      if (err != null) {
3056        future.completeExceptionally(err);
3057        return;
3058      }
3059      ServerName serverName = location.getServerName();
3060      if (serverName == null) {
3061        future
3062          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3063        return;
3064      }
3065      addListener(
3066        this.<GetRegionInfoResponse> newAdminCaller()
3067          .action((controller, stub) -> this
3068            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3069              controller, stub,
3070              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3071                true),
3072              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3073          .serverName(serverName).call(),
3074        (resp2, err2) -> {
3075          if (err2 != null) {
3076            future.completeExceptionally(err2);
3077          } else {
3078            if (resp2.hasCompactionState()) {
3079              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3080            } else {
3081              future.complete(CompactionState.NONE);
3082            }
3083          }
3084        });
3085    });
3086    return future;
3087  }
3088
3089  @Override
3090  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3091    MajorCompactionTimestampRequest request =
3092        MajorCompactionTimestampRequest.newBuilder()
3093            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3094    return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3095        this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3096            call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3097                c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3098  }
3099
3100  @Override
3101  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3102      byte[] regionName) {
3103    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3104    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3105    addListener(getRegionInfo(regionName), (region, err) -> {
3106      if (err != null) {
3107        future.completeExceptionally(err);
3108        return;
3109      }
3110      MajorCompactionTimestampForRegionRequest.Builder builder =
3111        MajorCompactionTimestampForRegionRequest.newBuilder();
3112      builder.setRegion(
3113        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3114      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3115        .<MajorCompactionTimestampForRegionRequest,
3116        MajorCompactionTimestampResponse, Optional<Long>> call(
3117          controller, stub, builder.build(),
3118          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3119          ProtobufUtil::toOptionalTimestamp))
3120        .call(), (timestamp, err2) -> {
3121          if (err2 != null) {
3122            future.completeExceptionally(err2);
3123          } else {
3124            future.complete(timestamp);
3125          }
3126        });
3127    });
3128    return future;
3129  }
3130
3131  @Override
3132  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3133      List<String> serverNamesList) {
3134    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3135    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3136      if (err != null) {
3137        future.completeExceptionally(err);
3138        return;
3139      }
3140      // Accessed by multiple threads.
3141      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3142      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3143      serverNames.stream().forEach(serverName -> {
3144        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3145          if (err2 != null) {
3146            future.completeExceptionally(unwrapCompletionException(err2));
3147          } else {
3148            serverStates.put(serverName, serverState);
3149          }
3150        }));
3151      });
3152      addListener(
3153        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3154        (ret, err3) -> {
3155          if (!future.isCompletedExceptionally()) {
3156            if (err3 != null) {
3157              future.completeExceptionally(err3);
3158            } else {
3159              future.complete(serverStates);
3160            }
3161          }
3162        });
3163    });
3164    return future;
3165  }
3166
3167  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3168    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3169    if (serverNamesList.isEmpty()) {
3170      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3171        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3172      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3173        if (err != null) {
3174          future.completeExceptionally(err);
3175        } else {
3176          future.complete(clusterMetrics.getServersName());
3177        }
3178      });
3179      return future;
3180    } else {
3181      List<ServerName> serverList = new ArrayList<>();
3182      for (String regionServerName : serverNamesList) {
3183        ServerName serverName = null;
3184        try {
3185          serverName = ServerName.valueOf(regionServerName);
3186        } catch (Exception e) {
3187          future.completeExceptionally(
3188            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3189        }
3190        if (serverName == null) {
3191          future.completeExceptionally(
3192            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3193        } else {
3194          serverList.add(serverName);
3195        }
3196      }
3197      future.complete(serverList);
3198    }
3199    return future;
3200  }
3201
3202  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3203    return this
3204        .<Boolean>newAdminCaller()
3205        .serverName(serverName)
3206        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3207            Boolean>adminCall(controller, stub,
3208            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3209            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3210  }
3211
3212  @Override
3213  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3214    return this.<Boolean> newMasterCaller()
3215      .action((controller, stub) -> this
3216        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3217          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3218          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3219          (resp) -> resp.getPrevBalanceValue()))
3220      .call();
3221  }
3222
3223  @Override
3224  public CompletableFuture<Boolean> balance(boolean forcible) {
3225    return this
3226        .<Boolean> newMasterCaller()
3227        .action(
3228          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3229            stub, RequestConverter.buildBalanceRequest(forcible),
3230            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3231  }
3232
3233  @Override
3234  public CompletableFuture<Boolean> isBalancerEnabled() {
3235    return this
3236        .<Boolean> newMasterCaller()
3237        .action((controller, stub) ->
3238              this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3239            stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3240                  -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3241  }
3242
3243  @Override
3244  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3245    return this
3246        .<Boolean> newMasterCaller()
3247        .action(
3248          (controller, stub) -> this
3249              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3250                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3251                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3252                    .getPrevNormalizerValue())).call();
3253  }
3254
3255  @Override
3256  public CompletableFuture<Boolean> isNormalizerEnabled() {
3257    return this
3258        .<Boolean> newMasterCaller()
3259        .action(
3260          (controller, stub) -> this
3261              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3262                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3263                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3264                (resp) -> resp.getEnabled())).call();
3265  }
3266
3267  @Override
3268  public CompletableFuture<Boolean> normalize() {
3269    return this
3270        .<Boolean> newMasterCaller()
3271        .action(
3272          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3273            controller, stub, RequestConverter.buildNormalizeRequest(),
3274            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3275        .call();
3276  }
3277
3278  @Override
3279  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3280    return this
3281        .<Boolean> newMasterCaller()
3282        .action(
3283          (controller, stub) -> this
3284              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3285                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3286                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3287                    .getPrevValue())).call();
3288  }
3289
3290  @Override
3291  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3292    return this
3293        .<Boolean> newMasterCaller()
3294        .action(
3295          (controller, stub) -> this
3296              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3297                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3298                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3299        .call();
3300  }
3301
3302  @Override
3303  public CompletableFuture<Boolean> runCleanerChore() {
3304    return this
3305        .<Boolean> newMasterCaller()
3306        .action(
3307          (controller, stub) -> this
3308              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3309                RequestConverter.buildRunCleanerChoreRequest(),
3310                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3311                (resp) -> resp.getCleanerChoreRan())).call();
3312  }
3313
3314  @Override
3315  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3316    return this
3317        .<Boolean> newMasterCaller()
3318        .action(
3319          (controller, stub) -> this
3320              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3321                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3322                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3323                    .getPrevValue())).call();
3324  }
3325
3326  @Override
3327  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3328    return this
3329        .<Boolean> newMasterCaller()
3330        .action(
3331          (controller, stub) -> this
3332              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3333                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3334                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3335                    .getValue())).call();
3336  }
3337
3338  @Override
3339  public CompletableFuture<Integer> runCatalogJanitor() {
3340    return this
3341        .<Integer> newMasterCaller()
3342        .action(
3343          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3344            controller, stub, RequestConverter.buildCatalogScanRequest(),
3345            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3346        .call();
3347  }
3348
3349  @Override
3350  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3351      ServiceCaller<S, R> callable) {
3352    MasterCoprocessorRpcChannelImpl channel =
3353        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3354    S stub = stubMaker.apply(channel);
3355    CompletableFuture<R> future = new CompletableFuture<>();
3356    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3357    callable.call(stub, controller, resp -> {
3358      if (controller.failed()) {
3359        future.completeExceptionally(controller.getFailed());
3360      } else {
3361        future.complete(resp);
3362      }
3363    });
3364    return future;
3365  }
3366
3367  @Override
3368  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3369      ServiceCaller<S, R> callable, ServerName serverName) {
3370    RegionServerCoprocessorRpcChannelImpl channel =
3371        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3372          serverName));
3373    S stub = stubMaker.apply(channel);
3374    CompletableFuture<R> future = new CompletableFuture<>();
3375    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3376    callable.call(stub, controller, resp -> {
3377      if (controller.failed()) {
3378        future.completeExceptionally(controller.getFailed());
3379      } else {
3380        future.complete(resp);
3381      }
3382    });
3383    return future;
3384  }
3385
3386  @Override
3387  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3388    return this.<List<ServerName>> newMasterCaller()
3389      .action((controller, stub) -> this
3390        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3391          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3392          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3393          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3394      .call();
3395  }
3396
3397  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3398    return this.connection.callerFactory.<T> serverRequest()
3399      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3400      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3401      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3402      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3403  }
3404
3405  @Override
3406  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3407    if (tableName == null) {
3408      return failedFuture(new IllegalArgumentException("Table name is null"));
3409    }
3410    CompletableFuture<Void> future = new CompletableFuture<>();
3411    addListener(tableExists(tableName), (exist, err) -> {
3412      if (err != null) {
3413        future.completeExceptionally(err);
3414        return;
3415      }
3416      if (!exist) {
3417        future.completeExceptionally(new TableNotFoundException(
3418          "Table '" + tableName.getNameAsString() + "' does not exists."));
3419        return;
3420      }
3421      addListener(getTableSplits(tableName), (splits, err1) -> {
3422        if (err1 != null) {
3423          future.completeExceptionally(err1);
3424        } else {
3425          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3426            if (err2 != null) {
3427              future.completeExceptionally(err2);
3428            } else {
3429              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3430                if (err3 != null) {
3431                  future.completeExceptionally(err3);
3432                } else {
3433                  future.complete(result3);
3434                }
3435              });
3436            }
3437          });
3438        }
3439      });
3440    });
3441    return future;
3442  }
3443
3444  @Override
3445  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3446    if (tableName == null) {
3447      return failedFuture(new IllegalArgumentException("Table name is null"));
3448    }
3449    CompletableFuture<Void> future = new CompletableFuture<>();
3450    addListener(tableExists(tableName), (exist, err) -> {
3451      if (err != null) {
3452        future.completeExceptionally(err);
3453        return;
3454      }
3455      if (!exist) {
3456        future.completeExceptionally(new TableNotFoundException(
3457          "Table '" + tableName.getNameAsString() + "' does not exists."));
3458        return;
3459      }
3460      addListener(setTableReplication(tableName, false), (result, err2) -> {
3461        if (err2 != null) {
3462          future.completeExceptionally(err2);
3463        } else {
3464          future.complete(result);
3465        }
3466      });
3467    });
3468    return future;
3469  }
3470
3471  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3472    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3473    addListener(
3474      getRegions(tableName).thenApply(regions -> regions.stream()
3475        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3476      (regions, err2) -> {
3477        if (err2 != null) {
3478          future.completeExceptionally(err2);
3479          return;
3480        }
3481        if (regions.size() == 1) {
3482          future.complete(null);
3483        } else {
3484          byte[][] splits = new byte[regions.size() - 1][];
3485          for (int i = 1; i < regions.size(); i++) {
3486            splits[i - 1] = regions.get(i).getStartKey();
3487          }
3488          future.complete(splits);
3489        }
3490      });
3491    return future;
3492  }
3493
3494  /**
3495   * Connect to peer and check the table descriptor on peer:
3496   * <ol>
3497   * <li>Create the same table on peer when not exist.</li>
3498   * <li>Throw an exception if the table already has replication enabled on any of the column
3499   * families.</li>
3500   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3501   * </ol>
3502   * @param tableName name of the table to sync to the peer
3503   * @param splits table split keys
3504   */
3505  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3506      byte[][] splits) {
3507    CompletableFuture<Void> future = new CompletableFuture<>();
3508    addListener(listReplicationPeers(), (peers, err) -> {
3509      if (err != null) {
3510        future.completeExceptionally(err);
3511        return;
3512      }
3513      if (peers == null || peers.size() <= 0) {
3514        future.completeExceptionally(
3515          new IllegalArgumentException("Found no peer cluster for replication."));
3516        return;
3517      }
3518      List<CompletableFuture<Void>> futures = new ArrayList<>();
3519      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3520        .forEach(peer -> {
3521          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3522        });
3523      addListener(
3524        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3525        (result, err2) -> {
3526          if (err2 != null) {
3527            future.completeExceptionally(err2);
3528          } else {
3529            future.complete(result);
3530          }
3531        });
3532    });
3533    return future;
3534  }
3535
3536  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3537      ReplicationPeerDescription peer) {
3538    Configuration peerConf = null;
3539    try {
3540      peerConf =
3541        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3542    } catch (IOException e) {
3543      return failedFuture(e);
3544    }
3545    CompletableFuture<Void> future = new CompletableFuture<>();
3546    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3547      if (err != null) {
3548        future.completeExceptionally(err);
3549        return;
3550      }
3551      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3552        if (err1 != null) {
3553          future.completeExceptionally(err1);
3554          return;
3555        }
3556        AsyncAdmin peerAdmin = conn.getAdmin();
3557        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3558          if (err2 != null) {
3559            future.completeExceptionally(err2);
3560            return;
3561          }
3562          if (!exist) {
3563            CompletableFuture<Void> createTableFuture = null;
3564            if (splits == null) {
3565              createTableFuture = peerAdmin.createTable(tableDesc);
3566            } else {
3567              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3568            }
3569            addListener(createTableFuture, (result, err3) -> {
3570              if (err3 != null) {
3571                future.completeExceptionally(err3);
3572              } else {
3573                future.complete(result);
3574              }
3575            });
3576          } else {
3577            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3578              (result, err4) -> {
3579                if (err4 != null) {
3580                  future.completeExceptionally(err4);
3581                } else {
3582                  future.complete(result);
3583                }
3584              });
3585          }
3586        });
3587      });
3588    });
3589    return future;
3590  }
3591
3592  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3593      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3594    CompletableFuture<Void> future = new CompletableFuture<>();
3595    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3596      if (err != null) {
3597        future.completeExceptionally(err);
3598        return;
3599      }
3600      if (peerTableDesc == null) {
3601        future.completeExceptionally(
3602          new IllegalArgumentException("Failed to get table descriptor for table " +
3603            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3604        return;
3605      }
3606      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3607        future.completeExceptionally(new IllegalArgumentException(
3608          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3609            ", but the table descriptors are not same when compared with source cluster." +
3610            " Thus can not enable the table's replication switch."));
3611        return;
3612      }
3613      future.complete(null);
3614    });
3615    return future;
3616  }
3617
3618  /**
3619   * Set the table's replication switch if the table's replication switch is already not set.
3620   * @param tableName name of the table
3621   * @param enableRep is replication switch enable or disable
3622   */
3623  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3624    CompletableFuture<Void> future = new CompletableFuture<>();
3625    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3626      if (err != null) {
3627        future.completeExceptionally(err);
3628        return;
3629      }
3630      if (!tableDesc.matchReplicationScope(enableRep)) {
3631        int scope =
3632          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3633        TableDescriptor newTableDesc =
3634          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3635        addListener(modifyTable(newTableDesc), (result, err2) -> {
3636          if (err2 != null) {
3637            future.completeExceptionally(err2);
3638          } else {
3639            future.complete(result);
3640          }
3641        });
3642      } else {
3643        future.complete(null);
3644      }
3645    });
3646    return future;
3647  }
3648
3649  @Override
3650  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3651    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3652    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3653      if (err != null) {
3654        future.completeExceptionally(err);
3655        return;
3656      }
3657      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3658        locations.stream().filter(l -> l.getRegion() != null)
3659          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3660          .collect(Collectors.groupingBy(l -> l.getServerName(),
3661            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3662      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3663      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3664      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3665        futures
3666          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3667            if (err2 != null) {
3668              future.completeExceptionally(unwrapCompletionException(err2));
3669            } else {
3670              aggregator.append(stats);
3671            }
3672          }));
3673      }
3674      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3675        (ret, err3) -> {
3676          if (err3 != null) {
3677            future.completeExceptionally(unwrapCompletionException(err3));
3678          } else {
3679            future.complete(aggregator.sum());
3680          }
3681        });
3682    });
3683    return future;
3684  }
3685
3686  @Override
3687  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3688      boolean preserveSplits) {
3689    CompletableFuture<Void> future = new CompletableFuture<>();
3690    addListener(tableExists(tableName), (exist, err) -> {
3691      if (err != null) {
3692        future.completeExceptionally(err);
3693        return;
3694      }
3695      if (!exist) {
3696        future.completeExceptionally(new TableNotFoundException(tableName));
3697        return;
3698      }
3699      addListener(tableExists(newTableName), (exist1, err1) -> {
3700        if (err1 != null) {
3701          future.completeExceptionally(err1);
3702          return;
3703        }
3704        if (exist1) {
3705          future.completeExceptionally(new TableExistsException(newTableName));
3706          return;
3707        }
3708        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3709          if (err2 != null) {
3710            future.completeExceptionally(err2);
3711            return;
3712          }
3713          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3714          if (preserveSplits) {
3715            addListener(getTableSplits(tableName), (splits, err3) -> {
3716              if (err3 != null) {
3717                future.completeExceptionally(err3);
3718              } else {
3719                addListener(
3720                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3721                  (result, err4) -> {
3722                    if (err4 != null) {
3723                      future.completeExceptionally(err4);
3724                    } else {
3725                      future.complete(result);
3726                    }
3727                  });
3728              }
3729            });
3730          } else {
3731            addListener(createTable(newTableDesc), (result, err5) -> {
3732              if (err5 != null) {
3733                future.completeExceptionally(err5);
3734              } else {
3735                future.complete(result);
3736              }
3737            });
3738          }
3739        });
3740      });
3741    });
3742    return future;
3743  }
3744
3745  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3746      List<RegionInfo> hris) {
3747    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3748      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3749        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3750        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3751        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3752      .serverName(serverName).call();
3753  }
3754
3755  @Override
3756  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3757    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3758        .action((controller, stub) -> this
3759            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3760              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3761              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3762              resp -> resp.getPreviousRpcThrottleEnabled()))
3763        .call();
3764    return future;
3765  }
3766
3767  @Override
3768  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3769    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3770        .action((controller, stub) -> this
3771            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3772              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3773              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3774              resp -> resp.getRpcThrottleEnabled()))
3775        .call();
3776    return future;
3777  }
3778
3779  @Override
3780  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3781    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3782        .action((controller, stub) -> this
3783            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3784              controller, stub,
3785              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3786                  .build(),
3787              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3788              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3789        .call();
3790    return future;
3791  }
3792
3793  @Override
3794  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3795    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3796      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3797      Map<TableName, Long>> call(controller, stub,
3798        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3799        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3800        resp -> resp.getSizesList().stream().collect(Collectors
3801          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3802      .call();
3803  }
3804
3805  @Override
3806  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3807      ServerName serverName) {
3808    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3809      .action((controller, stub) -> this
3810        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3811        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3812          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3813          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3814          resp -> resp.getSnapshotsList().stream()
3815            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3816              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3817      .serverName(serverName).call();
3818  }
3819
3820  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3821      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3822    return this.<SpaceQuotaSnapshot> newMasterCaller()
3823      .action((controller, stub) -> this
3824        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3825          RequestConverter.buildGetQuotaStatesRequest(),
3826          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3827      .call();
3828  }
3829
3830  @Override
3831  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3832    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3833      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3834      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3835  }
3836
3837  @Override
3838  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3839    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3840    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3841      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3842      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3843  }
3844
3845  @Override
3846  public CompletableFuture<Void> grant(UserPermission userPermission,
3847      boolean mergeExistingPermissions) {
3848    return this.<Void> newMasterCaller()
3849        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3850          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3851          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3852        .call();
3853  }
3854
3855  @Override
3856  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3857    return this.<Void> newMasterCaller()
3858        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3859          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3860          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3861        .call();
3862  }
3863
3864  @Override
3865  public CompletableFuture<List<UserPermission>>
3866      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3867    return this.<List<UserPermission>> newMasterCaller().action((controller,
3868        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3869            List<UserPermission>> call(controller, stub,
3870              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3871              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3872              resp -> resp.getUserPermissionList().stream()
3873                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3874                .collect(Collectors.toList())))
3875        .call();
3876  }
3877
3878  @Override
3879  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3880      List<Permission> permissions) {
3881    return this.<List<Boolean>> newMasterCaller()
3882        .action((controller, stub) -> this
3883            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3884              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3885              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3886              resp -> resp.getHasUserPermissionList()))
3887        .call();
3888  }
3889
3890  @Override
3891  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
3892      final boolean sync) {
3893    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3894        .call(controller, stub, RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
3895            MasterService.Interface::switchSnapshotCleanup,
3896            SetSnapshotCleanupResponse::getPrevSnapshotCleanup)).call();
3897  }
3898
3899  @Override
3900  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
3901    return this.<Boolean>newMasterCaller().action((controller, stub) -> this
3902        .call(controller, stub, RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
3903            MasterService.Interface::isSnapshotCleanupEnabled,
3904            IsSnapshotCleanupEnabledResponse::getEnabled)).call();
3905  }
3906
3907  @Override
3908  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
3909    return this.<Void> newMasterCaller()
3910        .action((controller, stub) -> this.
3911            <MoveServersRequest, MoveServersResponse, Void> call(controller, stub,
3912                RequestConverter.buildMoveServersRequest(servers, groupName),
3913              (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
3914        .call();
3915  }
3916
3917  @Override
3918  public CompletableFuture<Void> addRSGroup(String groupName) {
3919    return this.<Void> newMasterCaller()
3920        .action(((controller, stub) -> this.
3921            <AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, stub,
3922                AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3923              (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
3924        .call();
3925  }
3926
3927  @Override
3928  public CompletableFuture<Void> removeRSGroup(String groupName) {
3929    return this.<Void> newMasterCaller()
3930        .action((controller, stub) -> this.
3931            <RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(controller, stub,
3932                RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3933              (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
3934        .call();
3935  }
3936
3937  @Override
3938  public CompletableFuture<Boolean> balanceRSGroup(String groupName) {
3939    return this.<Boolean> newMasterCaller()
3940        .action((controller, stub) -> this.
3941            <BalanceRSGroupRequest, BalanceRSGroupResponse, Boolean> call(controller, stub,
3942                BalanceRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
3943              (s, c, req, done) -> s.balanceRSGroup(c, req, done), resp -> resp.getBalanceRan()))
3944        .call();
3945  }
3946
3947  @Override
3948  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
3949    return this.<List<RSGroupInfo>> newMasterCaller()
3950        .action((controller, stub) -> this
3951            .<ListRSGroupInfosRequest, ListRSGroupInfosResponse, List<RSGroupInfo>> call(
3952                controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
3953              (s, c, req, done) -> s.listRSGroupInfos(c, req, done),
3954              resp -> resp.getRSGroupInfoList().stream()
3955                  .map(r -> ProtobufUtil.toGroupInfo(r))
3956                  .collect(Collectors.toList())))
3957        .call();
3958  }
3959
3960  @Override
3961  public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
3962      @Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
3963    if (CollectionUtils.isEmpty(serverNames)) {
3964      return CompletableFuture.completedFuture(Collections.emptyList());
3965    }
3966    if (logQueryFilter.getType() == null
3967        || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
3968      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3969        .map((ServerName serverName) ->
3970          getSlowLogResponseFromServer(serverName, logQueryFilter))
3971        .map(CompletableFuture::join)
3972        .flatMap(List::stream)
3973        .collect(Collectors.toList()));
3974    } else {
3975      return CompletableFuture.supplyAsync(() -> serverNames.stream()
3976        .map((ServerName serverName) ->
3977          getLargeLogResponseFromServer(serverName, logQueryFilter))
3978        .map(CompletableFuture::join)
3979        .flatMap(List::stream)
3980        .collect(Collectors.toList()));
3981    }
3982  }
3983
3984  private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer(
3985      final ServerName serverName, final LogQueryFilter logQueryFilter) {
3986    return this.<List<OnlineLogRecord>>newAdminCaller()
3987      .action((controller, stub) -> this
3988        .adminCall(
3989          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
3990          AdminService.Interface::getSlowLogResponses,
3991          ProtobufUtil::toSlowLogPayloads))
3992      .serverName(serverName).call();
3993  }
3994
3995  private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer(
3996    final ServerName serverName, final LogQueryFilter logQueryFilter) {
3997    return this.<List<OnlineLogRecord>>newAdminCaller()
3998      .action((controller, stub) -> this
3999        .adminCall(
4000          controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
4001          AdminService.Interface::getLargeLogResponses,
4002          ProtobufUtil::toSlowLogPayloads))
4003      .serverName(serverName).call();
4004  }
4005
4006  @Override
4007  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
4008      @Nullable Set<ServerName> serverNames) {
4009    if (CollectionUtils.isEmpty(serverNames)) {
4010      return CompletableFuture.completedFuture(Collections.emptyList());
4011    }
4012    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
4013      .map(this::clearSlowLogsResponses)
4014      .collect(Collectors.toList());
4015    return convertToFutureOfList(clearSlowLogResponseList);
4016  }
4017
4018  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4019    return this.<Boolean>newAdminCaller()
4020      .action(((controller, stub) -> this
4021        .adminCall(
4022          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
4023          AdminService.Interface::clearSlowLogsResponses,
4024          ProtobufUtil::toClearSlowLogPayload))
4025      ).serverName(serverName).call();
4026  }
4027
4028  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
4029    List<CompletableFuture<T>> futures) {
4030    CompletableFuture<Void> allDoneFuture =
4031      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4032    return allDoneFuture.thenApply(v ->
4033      futures.stream()
4034        .map(CompletableFuture::join)
4035        .collect(Collectors.toList())
4036    );
4037  }
4038
4039  @Override
4040  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4041    return this.<List<TableName>> newMasterCaller()
4042      .action((controller, stub) -> this
4043        .<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, List<TableName>> call(controller,
4044          stub, ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4045          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4046            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4047      .call();
4048  }
4049
4050  @Override
4051  public CompletableFuture<Pair<List<String>, List<TableName>>>
4052    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4053    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4054      .action((controller, stub) -> this
4055        .<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4056          GetConfiguredNamespacesAndTablesInRSGroupResponse,
4057          Pair<List<String>, List<TableName>>> call(controller, stub,
4058          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4059            .build(),
4060            (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4061            resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4062              .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4063      .call();
4064  }
4065
4066  @Override
4067  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4068    return this.<RSGroupInfo> newMasterCaller()
4069      .action(((controller, stub) -> this
4070        .<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, RSGroupInfo> call(
4071          controller, stub,
4072          GetRSGroupInfoOfServerRequest.newBuilder()
4073            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4074              .setPort(hostPort.getPort()).build())
4075            .build(),
4076          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4077          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4078      .call();
4079  }
4080
4081  @Override
4082  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4083    return this.<Void> newMasterCaller()
4084      .action((controller, stub) -> this.
4085            <RemoveServersRequest, RemoveServersResponse, Void> call(controller, stub,
4086                RequestConverter.buildRemoveServersRequest(servers),
4087              (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4088        .call();
4089  }
4090
4091  @Override
4092  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4093    CompletableFuture<Void> future = new CompletableFuture<>();
4094    for (TableName tableName : tables) {
4095      addListener(tableExists(tableName), (exist, err) -> {
4096        if (err != null) {
4097          future.completeExceptionally(err);
4098          return;
4099        }
4100        if (!exist) {
4101          future.completeExceptionally(new TableNotFoundException(tableName));
4102          return;
4103        }
4104      });
4105    }
4106    addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4107      if (err != null) {
4108        future.completeExceptionally(err);
4109        return;
4110      }
4111      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4112        future.complete(null);
4113        return;
4114      }
4115      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4116      for (TableDescriptor td : tableDescriptions) {
4117        newTableDescriptors
4118            .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4119      }
4120      addListener(CompletableFuture.allOf(
4121        newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4122        (v, e) -> {
4123          if (e != null) {
4124            future.completeExceptionally(e);
4125          } else {
4126            future.complete(v);
4127          }
4128        });
4129    }));
4130    return future;
4131  }
4132
4133  @Override
4134  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4135    return this.<RSGroupInfo> newMasterCaller().action(((controller, stub) -> this
4136      .<GetRSGroupInfoOfTableRequest, GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller,
4137        stub,
4138        GetRSGroupInfoOfTableRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(table))
4139          .build(),
4140        (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4141        resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4142      .call();
4143  }
4144
4145  @Override
4146  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4147    return this.<RSGroupInfo> newMasterCaller()
4148      .action(((controller, stub) -> this
4149        .<GetRSGroupInfoRequest, GetRSGroupInfoResponse, RSGroupInfo> call(controller, stub,
4150          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4151          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4152          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4153      .call();
4154  }
4155
4156  @Override
4157  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4158    return this.<Void> newMasterCaller()
4159      .action(
4160        (
4161          (controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4162            controller,
4163            stub,
4164            RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName).setNewRsgroupName(newName)
4165                                .build(),
4166            (s, c, req, done) -> s.renameRSGroup(c, req, done),
4167            resp -> null
4168          )
4169        )
4170      ).call();
4171  }
4172
4173  @Override
4174  public CompletableFuture<Void>
4175    updateRSGroupConfig(String groupName, Map<String, String> configuration) {
4176    UpdateRSGroupConfigRequest.Builder request = UpdateRSGroupConfigRequest.newBuilder()
4177        .setGroupName(groupName);
4178    if (configuration != null) {
4179      configuration.entrySet().forEach(e ->
4180          request.addConfiguration(NameStringPair.newBuilder().setName(e.getKey())
4181              .setValue(e.getValue()).build()));
4182    }
4183    return this.<Void> newMasterCaller()
4184        .action(((controller, stub) ->
4185            this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
4186                controller, stub, request.build(),
4187              (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4188        ).call();
4189  }
4190}