001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcChannel;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
052import org.apache.hadoop.hbase.CacheEvictionStats;
053import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MetaTableAccessor;
060import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
061import org.apache.hadoop.hbase.NamespaceDescriptor;
062import org.apache.hadoop.hbase.RegionLocations;
063import org.apache.hadoop.hbase.RegionMetrics;
064import org.apache.hadoop.hbase.RegionMetricsBuilder;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableExistsException;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.TableNotDisabledException;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.UnknownRegionException;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
075import org.apache.hadoop.hbase.client.Scan.ReadType;
076import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
077import org.apache.hadoop.hbase.client.replication.TableCFs;
078import org.apache.hadoop.hbase.client.security.SecurityCapability;
079import org.apache.hadoop.hbase.exceptions.DeserializationException;
080import org.apache.hadoop.hbase.ipc.HBaseRpcController;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.yetus.audience.InterfaceAudience;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
104import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
105import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
106import org.apache.hbase.thirdparty.io.netty.util.Timeout;
107import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
108
109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
110import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
296
297/**
298 * The implementation of AsyncAdmin.
299 * <p>
300 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
301 * be finished inside the rpc framework thread, which means that the callbacks registered to the
302 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
303 * this class should not try to do time consuming tasks in the callbacks.
304 * @since 2.0.0
305 * @see AsyncHBaseAdmin
306 * @see AsyncConnection#getAdmin()
307 * @see AsyncConnection#getAdminBuilder()
308 */
309@InterfaceAudience.Private
310class RawAsyncHBaseAdmin implements AsyncAdmin {
311  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
312
313  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
314
315  private final AsyncConnectionImpl connection;
316
317  private final HashedWheelTimer retryTimer;
318
319  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
320
321  private final long rpcTimeoutNs;
322
323  private final long operationTimeoutNs;
324
325  private final long pauseNs;
326
327  private final long pauseForCQTBENs;
328
329  private final int maxAttempts;
330
331  private final int startLogErrorsCnt;
332
333  private final NonceGenerator ng;
334
335  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
336      AsyncAdminBuilderBase builder) {
337    this.connection = connection;
338    this.retryTimer = retryTimer;
339    this.metaTable = connection.getTable(META_TABLE_NAME);
340    this.rpcTimeoutNs = builder.rpcTimeoutNs;
341    this.operationTimeoutNs = builder.operationTimeoutNs;
342    this.pauseNs = builder.pauseNs;
343    if (builder.pauseForCQTBENs < builder.pauseNs) {
344      LOG.warn(
345        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
346          " the normal pause value {} ms, use the greater one instead",
347        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
348        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
349      this.pauseForCQTBENs = builder.pauseNs;
350    } else {
351      this.pauseForCQTBENs = builder.pauseForCQTBENs;
352    }
353    this.maxAttempts = builder.maxAttempts;
354    this.startLogErrorsCnt = builder.startLogErrorsCnt;
355    this.ng = connection.getNonceGenerator();
356  }
357
358  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
359    return this.connection.callerFactory.<T> masterRequest()
360      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
361      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
362      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
363      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
364  }
365
366  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
367    return this.connection.callerFactory.<T> adminRequest()
368      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
369      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
370      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
371      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
372  }
373
374  @FunctionalInterface
375  private interface MasterRpcCall<RESP, REQ> {
376    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
377        RpcCallback<RESP> done);
378  }
379
380  @FunctionalInterface
381  private interface AdminRpcCall<RESP, REQ> {
382    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
383        RpcCallback<RESP> done);
384  }
385
386  @FunctionalInterface
387  private interface Converter<D, S> {
388    D convert(S src) throws IOException;
389  }
390
391  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
392      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
393      Converter<RESP, PRESP> respConverter) {
394    CompletableFuture<RESP> future = new CompletableFuture<>();
395    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
396
397      @Override
398      public void run(PRESP resp) {
399        if (controller.failed()) {
400          future.completeExceptionally(controller.getFailed());
401        } else {
402          try {
403            future.complete(respConverter.convert(resp));
404          } catch (IOException e) {
405            future.completeExceptionally(e);
406          }
407        }
408      }
409    });
410    return future;
411  }
412
413  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
414      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
415      Converter<RESP, PRESP> respConverter) {
416    CompletableFuture<RESP> future = new CompletableFuture<>();
417    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
418
419      @Override
420      public void run(PRESP resp) {
421        if (controller.failed()) {
422          future.completeExceptionally(controller.getFailed());
423        } else {
424          try {
425            future.complete(respConverter.convert(resp));
426          } catch (IOException e) {
427            future.completeExceptionally(e);
428          }
429        }
430      }
431    });
432    return future;
433  }
434
435  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
436      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
437      ProcedureBiConsumer consumer) {
438    return procedureCall(b -> {
439    }, preq, rpcCall, respConverter, consumer);
440  }
441
442  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
443      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
444      ProcedureBiConsumer consumer) {
445    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
446  }
447
448  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
449      Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
450      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
451      ProcedureBiConsumer consumer) {
452    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
453        stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
454    prioritySetter.accept(builder);
455    CompletableFuture<Long> procFuture = builder.call();
456    CompletableFuture<Void> future = waitProcedureResult(procFuture);
457    addListener(future, consumer);
458    return future;
459  }
460
461  @FunctionalInterface
462  private interface TableOperator {
463    CompletableFuture<Void> operate(TableName table);
464  }
465
466  @Override
467  public CompletableFuture<Boolean> tableExists(TableName tableName) {
468    if (TableName.isMetaTableName(tableName)) {
469      return CompletableFuture.completedFuture(true);
470    }
471    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
472  }
473
474  @Override
475  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
476    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
477      includeSysTables));
478  }
479
480  /**
481   * {@link #listTableDescriptors(boolean)}
482   */
483  @Override
484  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
485      boolean includeSysTables) {
486    Preconditions.checkNotNull(pattern,
487      "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
488    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
489      includeSysTables));
490  }
491
492  @Override
493  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
494    Preconditions.checkNotNull(tableNames,
495      "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
496    if (tableNames.isEmpty()) {
497      return CompletableFuture.completedFuture(Collections.emptyList());
498    }
499    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
500  }
501
502  private CompletableFuture<List<TableDescriptor>>
503      getTableDescriptors(GetTableDescriptorsRequest request) {
504    return this.<List<TableDescriptor>> newMasterCaller()
505        .action((controller, stub) -> this
506            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
507              controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
508              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
509        .call();
510  }
511
512  @Override
513  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
514    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
515  }
516
517  @Override
518  public CompletableFuture<List<TableName>>
519      listTableNames(Pattern pattern, boolean includeSysTables) {
520    Preconditions.checkNotNull(pattern,
521        "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
522    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
523  }
524
525  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
526    return this
527        .<List<TableName>> newMasterCaller()
528        .action(
529          (controller, stub) -> this
530              .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
531                stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
532                (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
533  }
534
535  @Override
536  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
537    return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
538        .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
539        List<TableDescriptor>> call(
540          controller, stub,
541          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
542          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
543          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
544        .call();
545  }
546
547  @Override
548  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
549    return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
550        .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
551        List<TableName>> call(
552          controller, stub,
553          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
554          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
555          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
556        .call();
557  }
558
559  @Override
560  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
561    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
562    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
563      .action((controller, stub) -> this
564        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
565          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
566          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
567          (resp) -> resp.getTableSchemaList()))
568      .call(), (tableSchemas, error) -> {
569        if (error != null) {
570          future.completeExceptionally(error);
571          return;
572        }
573        if (!tableSchemas.isEmpty()) {
574          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
575        } else {
576          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
577        }
578      });
579    return future;
580  }
581
582  @Override
583  public CompletableFuture<Void> createTable(TableDescriptor desc) {
584    return createTable(desc.getTableName(),
585      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
586  }
587
588  @Override
589  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
590      int numRegions) {
591    try {
592      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
593    } catch (IllegalArgumentException e) {
594      return failedFuture(e);
595    }
596  }
597
598  @Override
599  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
600    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
601        + " use createTable(TableDescriptor) instead");
602    try {
603      verifySplitKeys(splitKeys);
604      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
605        splitKeys, ng.getNonceGroup(), ng.newNonce()));
606    } catch (IllegalArgumentException e) {
607      return failedFuture(e);
608    }
609  }
610
611  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
612    Preconditions.checkNotNull(tableName, "table name is null");
613    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
614      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
615      new CreateTableProcedureBiConsumer(tableName));
616  }
617
618  @Override
619  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
620    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
621      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
622        ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
623      (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
624  }
625
626  @Override
627  public CompletableFuture<Void> deleteTable(TableName tableName) {
628    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
629      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
630      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
631      new DeleteTableProcedureBiConsumer(tableName));
632  }
633
634  @Override
635  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
636    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
637      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
638        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
639      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
640  }
641
642  @Override
643  public CompletableFuture<Void> enableTable(TableName tableName) {
644    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
645      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
646      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
647      new EnableTableProcedureBiConsumer(tableName));
648  }
649
650  @Override
651  public CompletableFuture<Void> disableTable(TableName tableName) {
652    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
653      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
654      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
655      new DisableTableProcedureBiConsumer(tableName));
656  }
657
658  @Override
659  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
660    if (TableName.isMetaTableName(tableName)) {
661      return CompletableFuture.completedFuture(true);
662    }
663    CompletableFuture<Boolean> future = new CompletableFuture<>();
664    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
665      if (error != null) {
666        future.completeExceptionally(error);
667        return;
668      }
669      if (state.isPresent()) {
670        future.complete(state.get().inStates(TableState.State.ENABLED));
671      } else {
672        future.completeExceptionally(new TableNotFoundException(tableName));
673      }
674    });
675    return future;
676  }
677
678  @Override
679  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
680    if (TableName.isMetaTableName(tableName)) {
681      return CompletableFuture.completedFuture(false);
682    }
683    CompletableFuture<Boolean> future = new CompletableFuture<>();
684    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
685      if (error != null) {
686        future.completeExceptionally(error);
687        return;
688      }
689      if (state.isPresent()) {
690        future.complete(state.get().inStates(TableState.State.DISABLED));
691      } else {
692        future.completeExceptionally(new TableNotFoundException(tableName));
693      }
694    });
695    return future;
696  }
697
698  @Override
699  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
700    return isTableAvailable(tableName, Optional.empty());
701  }
702
703  @Override
704  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
705    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
706        + " use isTableAvailable(TableName) instead");
707    return isTableAvailable(tableName, Optional.of(splitKeys));
708  }
709
710  private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
711      Optional<byte[][]> splitKeys) {
712    if (TableName.isMetaTableName(tableName)) {
713      return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
714        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
715    }
716    CompletableFuture<Boolean> future = new CompletableFuture<>();
717    addListener(isTableEnabled(tableName), (enabled, error) -> {
718      if (error != null) {
719        if (error instanceof TableNotFoundException) {
720          future.complete(false);
721        } else {
722          future.completeExceptionally(error);
723        }
724        return;
725      }
726      if (!enabled) {
727        future.complete(false);
728      } else {
729        addListener(
730          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
731          (locations, error1) -> {
732            if (error1 != null) {
733              future.completeExceptionally(error1);
734              return;
735            }
736            List<HRegionLocation> notDeployedRegions = locations.stream()
737              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
738            if (notDeployedRegions.size() > 0) {
739              if (LOG.isDebugEnabled()) {
740                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
741              }
742              future.complete(false);
743              return;
744            }
745
746            Optional<Boolean> available =
747              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
748            future.complete(available.orElse(true));
749          });
750      }
751    });
752    return future;
753  }
754
755  private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
756    int regionCount = 0;
757    for (HRegionLocation location : locations) {
758      RegionInfo info = location.getRegion();
759      if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
760        regionCount++;
761        continue;
762      }
763      for (byte[] splitKey : splitKeys) {
764        // Just check if the splitkey is available
765        if (Bytes.equals(info.getStartKey(), splitKey)) {
766          regionCount++;
767          break;
768        }
769      }
770    }
771    return regionCount == splitKeys.length + 1;
772  }
773
774  @Override
775  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
776    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
777      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
778        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
779      new AddColumnFamilyProcedureBiConsumer(tableName));
780  }
781
782  @Override
783  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
784    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
785      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
786        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
787      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
788  }
789
790  @Override
791  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
792      ColumnFamilyDescriptor columnFamily) {
793    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
794      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
795        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
796      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
797  }
798
799  @Override
800  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
801    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
802      RequestConverter.buildCreateNamespaceRequest(descriptor),
803      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
804      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
805  }
806
807  @Override
808  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
809    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
810      RequestConverter.buildModifyNamespaceRequest(descriptor),
811      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
812      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
813  }
814
815  @Override
816  public CompletableFuture<Void> deleteNamespace(String name) {
817    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
818      RequestConverter.buildDeleteNamespaceRequest(name),
819      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
820      new DeleteNamespaceProcedureBiConsumer(name));
821  }
822
823  @Override
824  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
825    return this
826        .<NamespaceDescriptor> newMasterCaller()
827        .action(
828          (controller, stub) -> this
829              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
830                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
831                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
832                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
833  }
834
835  @Override
836  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
837    return this
838        .<List<NamespaceDescriptor>> newMasterCaller()
839        .action(
840          (controller, stub) -> this
841              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
842                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
843                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
844                    .toNamespaceDescriptorList(resp))).call();
845  }
846
847  @Override
848  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
849    return this.<List<RegionInfo>> newAdminCaller()
850        .action((controller, stub) -> this
851            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
852              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
853              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
854              resp -> ProtobufUtil.getRegionInfos(resp)))
855        .serverName(serverName).call();
856  }
857
858  @Override
859  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
860    if (tableName.equals(META_TABLE_NAME)) {
861      return connection.registry.getMetaRegionLocation()
862        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
863          .collect(Collectors.toList()));
864    } else {
865      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
866        .thenApply(
867          locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
868    }
869  }
870
871  @Override
872  public CompletableFuture<Void> flush(TableName tableName) {
873    CompletableFuture<Void> future = new CompletableFuture<>();
874    addListener(tableExists(tableName), (exists, err) -> {
875      if (err != null) {
876        future.completeExceptionally(err);
877      } else if (!exists) {
878        future.completeExceptionally(new TableNotFoundException(tableName));
879      } else {
880        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
881          if (err2 != null) {
882            future.completeExceptionally(err2);
883          } else if (!tableEnabled) {
884            future.completeExceptionally(new TableNotEnabledException(tableName));
885          } else {
886            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
887              new HashMap<>()), (ret, err3) -> {
888                if (err3 != null) {
889                  future.completeExceptionally(err3);
890                } else {
891                  future.complete(ret);
892                }
893              });
894          }
895        });
896      }
897    });
898    return future;
899  }
900
901  @Override
902  public CompletableFuture<Void> flushRegion(byte[] regionName) {
903    CompletableFuture<Void> future = new CompletableFuture<>();
904    addListener(getRegionLocation(regionName), (location, err) -> {
905      if (err != null) {
906        future.completeExceptionally(err);
907        return;
908      }
909      ServerName serverName = location.getServerName();
910      if (serverName == null) {
911        future
912          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
913        return;
914      }
915      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
916        if (err2 != null) {
917          future.completeExceptionally(err2);
918        } else {
919          future.complete(ret);
920        }
921      });
922    });
923    return future;
924  }
925
926  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
927    return this.<Void> newAdminCaller()
928            .serverName(serverName)
929            .action(
930              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
931                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
932                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
933                resp -> null))
934            .call();
935  }
936
937  @Override
938  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
939    CompletableFuture<Void> future = new CompletableFuture<>();
940    addListener(getRegions(sn), (hRegionInfos, err) -> {
941      if (err != null) {
942        future.completeExceptionally(err);
943        return;
944      }
945      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
946      if (hRegionInfos != null) {
947        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
948      }
949      addListener(CompletableFuture.allOf(
950        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
951          if (err2 != null) {
952            future.completeExceptionally(err2);
953          } else {
954            future.complete(ret);
955          }
956        });
957    });
958    return future;
959  }
960
961  @Override
962  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
963    return compact(tableName, null, false, compactType);
964  }
965
966  @Override
967  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
968      CompactType compactType) {
969    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
970        + "If you don't specify a columnFamily, use compact(TableName) instead");
971    return compact(tableName, columnFamily, false, compactType);
972  }
973
974  @Override
975  public CompletableFuture<Void> compactRegion(byte[] regionName) {
976    return compactRegion(regionName, null, false);
977  }
978
979  @Override
980  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
981    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
982        + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
983    return compactRegion(regionName, columnFamily, false);
984  }
985
986  @Override
987  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
988    return compact(tableName, null, true, compactType);
989  }
990
991  @Override
992  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
993      CompactType compactType) {
994    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
995        + "If you don't specify a columnFamily, use compact(TableName) instead");
996    return compact(tableName, columnFamily, true, compactType);
997  }
998
999  @Override
1000  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1001    return compactRegion(regionName, null, true);
1002  }
1003
1004  @Override
1005  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1006    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1007        + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1008    return compactRegion(regionName, columnFamily, true);
1009  }
1010
1011  @Override
1012  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1013    return compactRegionServer(sn, false);
1014  }
1015
1016  @Override
1017  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1018    return compactRegionServer(sn, true);
1019  }
1020
1021  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1022    CompletableFuture<Void> future = new CompletableFuture<>();
1023    addListener(getRegions(sn), (hRegionInfos, err) -> {
1024      if (err != null) {
1025        future.completeExceptionally(err);
1026        return;
1027      }
1028      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1029      if (hRegionInfos != null) {
1030        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1031      }
1032      addListener(CompletableFuture.allOf(
1033        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1034          if (err2 != null) {
1035            future.completeExceptionally(err2);
1036          } else {
1037            future.complete(ret);
1038          }
1039        });
1040    });
1041    return future;
1042  }
1043
1044  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1045      boolean major) {
1046    CompletableFuture<Void> future = new CompletableFuture<>();
1047    addListener(getRegionLocation(regionName), (location, err) -> {
1048      if (err != null) {
1049        future.completeExceptionally(err);
1050        return;
1051      }
1052      ServerName serverName = location.getServerName();
1053      if (serverName == null) {
1054        future
1055          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1056        return;
1057      }
1058      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1059        (ret, err2) -> {
1060          if (err2 != null) {
1061            future.completeExceptionally(err2);
1062          } else {
1063            future.complete(ret);
1064          }
1065        });
1066    });
1067    return future;
1068  }
1069
1070  /**
1071   * List all region locations for the specific table.
1072   */
1073  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1074    if (TableName.META_TABLE_NAME.equals(tableName)) {
1075      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1076      // For meta table, we use zk to fetch all locations.
1077      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1078      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1079        if (err != null) {
1080          future.completeExceptionally(err);
1081        } else if (metaRegions == null || metaRegions.isEmpty() ||
1082          metaRegions.getDefaultRegionLocation() == null) {
1083          future.completeExceptionally(new IOException("meta region does not found"));
1084        } else {
1085          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1086        }
1087        // close the registry.
1088        IOUtils.closeQuietly(registry);
1089      });
1090      return future;
1091    } else {
1092      // For non-meta table, we fetch all locations by scanning hbase:meta table
1093      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1094    }
1095  }
1096
1097  /**
1098   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1099   */
1100  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1101      CompactType compactType) {
1102    CompletableFuture<Void> future = new CompletableFuture<>();
1103
1104    switch (compactType) {
1105      case MOB:
1106        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1107          if (err != null) {
1108            future.completeExceptionally(err);
1109            return;
1110          }
1111          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1112          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1113            if (err2 != null) {
1114              future.completeExceptionally(err2);
1115            } else {
1116              future.complete(ret);
1117            }
1118          });
1119        });
1120        break;
1121      case NORMAL:
1122        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1123          if (err != null) {
1124            future.completeExceptionally(err);
1125            return;
1126          }
1127          if (locations == null || locations.isEmpty()) {
1128            future.completeExceptionally(new TableNotFoundException(tableName));
1129          }
1130          CompletableFuture<?>[] compactFutures =
1131            locations.stream().filter(l -> l.getRegion() != null)
1132              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1133              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1134              .toArray(CompletableFuture<?>[]::new);
1135          // future complete unless all of the compact futures are completed.
1136          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1137            if (err2 != null) {
1138              future.completeExceptionally(err2);
1139            } else {
1140              future.complete(ret);
1141            }
1142          });
1143        });
1144        break;
1145      default:
1146        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1147    }
1148    return future;
1149  }
1150
1151  /**
1152   * Compact the region at specific region server.
1153   */
1154  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1155      final boolean major, byte[] columnFamily) {
1156    return this
1157        .<Void> newAdminCaller()
1158        .serverName(sn)
1159        .action(
1160          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1161            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1162              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1163            resp -> null)).call();
1164  }
1165
1166  private byte[] toEncodeRegionName(byte[] regionName) {
1167    try {
1168      return RegionInfo.isEncodedRegionName(regionName) ? regionName
1169          : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1170    } catch (IOException e) {
1171      return regionName;
1172    }
1173  }
1174
1175  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1176      CompletableFuture<TableName> result) {
1177    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1178      if (err != null) {
1179        result.completeExceptionally(err);
1180        return;
1181      }
1182      RegionInfo regionInfo = location.getRegion();
1183      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1184        result.completeExceptionally(
1185          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1186        return;
1187      }
1188      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1189        if (!tableName.get().equals(regionInfo.getTable())) {
1190          // tables of this two region should be same.
1191          result.completeExceptionally(
1192            new IllegalArgumentException("Cannot merge regions from two different tables " +
1193              tableName.get() + " and " + regionInfo.getTable()));
1194        } else {
1195          result.complete(tableName.get());
1196        }
1197      }
1198    });
1199  }
1200
1201  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1202    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1203    CompletableFuture<TableName> future = new CompletableFuture<>();
1204    for (byte[] encodedRegionName : encodedRegionNames) {
1205      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1206    }
1207    return future;
1208  }
1209
1210  @Override
1211  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1212    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1213  }
1214
1215  @Override
1216  public CompletableFuture<Boolean> isMergeEnabled() {
1217    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1218  }
1219
1220  @Override
1221  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1222    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1223  }
1224
1225  @Override
1226  public CompletableFuture<Boolean> isSplitEnabled() {
1227    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1228  }
1229
1230  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1231      MasterSwitchType switchType) {
1232    SetSplitOrMergeEnabledRequest request =
1233      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1234    return this.<Boolean> newMasterCaller()
1235      .action((controller, stub) -> this
1236        .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1237          stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1238          (resp) -> resp.getPrevValueList().get(0)))
1239      .call();
1240  }
1241
1242  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1243    IsSplitOrMergeEnabledRequest request =
1244        RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1245    return this
1246        .<Boolean> newMasterCaller()
1247        .action(
1248          (controller, stub) -> this
1249              .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1250                controller, stub, request,
1251                (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1252                (resp) -> resp.getEnabled())).call();
1253  }
1254
1255  @Override
1256  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1257    if (nameOfRegionsToMerge.size() < 2) {
1258      return failedFuture(new IllegalArgumentException(
1259        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1260    }
1261    CompletableFuture<Void> future = new CompletableFuture<>();
1262    byte[][] encodedNameOfRegionsToMerge =
1263      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1264
1265    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1266      if (err != null) {
1267        future.completeExceptionally(err);
1268        return;
1269      }
1270
1271      MergeTableRegionsRequest request = null;
1272      try {
1273        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1274          forcible, ng.getNonceGroup(), ng.newNonce());
1275      } catch (DeserializationException e) {
1276        future.completeExceptionally(e);
1277        return;
1278      }
1279
1280      addListener(
1281        this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
1282          (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1283          new MergeTableRegionProcedureBiConsumer(tableName)),
1284        (ret, err2) -> {
1285          if (err2 != null) {
1286            future.completeExceptionally(err2);
1287          } else {
1288            future.complete(ret);
1289          }
1290        });
1291    });
1292    return future;
1293  }
1294
1295  @Override
1296  public CompletableFuture<Void> split(TableName tableName) {
1297    CompletableFuture<Void> future = new CompletableFuture<>();
1298    addListener(tableExists(tableName), (exist, error) -> {
1299      if (error != null) {
1300        future.completeExceptionally(error);
1301        return;
1302      }
1303      if (!exist) {
1304        future.completeExceptionally(new TableNotFoundException(tableName));
1305        return;
1306      }
1307      addListener(
1308        metaTable
1309          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1310            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1311            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1312        (results, err2) -> {
1313          if (err2 != null) {
1314            future.completeExceptionally(err2);
1315            return;
1316          }
1317          if (results != null && !results.isEmpty()) {
1318            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1319            for (Result r : results) {
1320              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1321                continue;
1322              }
1323              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1324              if (rl != null) {
1325                for (HRegionLocation h : rl.getRegionLocations()) {
1326                  if (h != null && h.getServerName() != null) {
1327                    RegionInfo hri = h.getRegion();
1328                    if (hri == null || hri.isSplitParent() ||
1329                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1330                      continue;
1331                    }
1332                    splitFutures.add(split(hri, null));
1333                  }
1334                }
1335              }
1336            }
1337            addListener(
1338              CompletableFuture
1339                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1340              (ret, exception) -> {
1341                if (exception != null) {
1342                  future.completeExceptionally(exception);
1343                  return;
1344                }
1345                future.complete(ret);
1346              });
1347          } else {
1348            future.complete(null);
1349          }
1350        });
1351    });
1352    return future;
1353  }
1354
1355  @Override
1356  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1357    CompletableFuture<Void> result = new CompletableFuture<>();
1358    if (splitPoint == null) {
1359      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1360    }
1361    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1362      (loc, err) -> {
1363        if (err != null) {
1364          result.completeExceptionally(err);
1365        } else if (loc == null || loc.getRegion() == null) {
1366          result.completeExceptionally(new IllegalArgumentException(
1367            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1368        } else {
1369          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1370            if (err2 != null) {
1371              result.completeExceptionally(err2);
1372            } else {
1373              result.complete(ret);
1374            }
1375
1376          });
1377        }
1378      });
1379    return result;
1380  }
1381
1382  @Override
1383  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1384    CompletableFuture<Void> future = new CompletableFuture<>();
1385    addListener(getRegionLocation(regionName), (location, err) -> {
1386      if (err != null) {
1387        future.completeExceptionally(err);
1388        return;
1389      }
1390      RegionInfo regionInfo = location.getRegion();
1391      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1392        future
1393          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1394            "Replicas are auto-split when their primary is split."));
1395        return;
1396      }
1397      ServerName serverName = location.getServerName();
1398      if (serverName == null) {
1399        future
1400          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1401        return;
1402      }
1403      addListener(split(regionInfo, null), (ret, err2) -> {
1404        if (err2 != null) {
1405          future.completeExceptionally(err2);
1406        } else {
1407          future.complete(ret);
1408        }
1409      });
1410    });
1411    return future;
1412  }
1413
1414  @Override
1415  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1416    Preconditions.checkNotNull(splitPoint,
1417      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1418    CompletableFuture<Void> future = new CompletableFuture<>();
1419    addListener(getRegionLocation(regionName), (location, err) -> {
1420      if (err != null) {
1421        future.completeExceptionally(err);
1422        return;
1423      }
1424      RegionInfo regionInfo = location.getRegion();
1425      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1426        future
1427          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1428            "Replicas are auto-split when their primary is split."));
1429        return;
1430      }
1431      ServerName serverName = location.getServerName();
1432      if (serverName == null) {
1433        future
1434          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1435        return;
1436      }
1437      if (regionInfo.getStartKey() != null &&
1438        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1439        future.completeExceptionally(
1440          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1441        return;
1442      }
1443      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1444        if (err2 != null) {
1445          future.completeExceptionally(err2);
1446        } else {
1447          future.complete(ret);
1448        }
1449      });
1450    });
1451    return future;
1452  }
1453
1454  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1455    CompletableFuture<Void> future = new CompletableFuture<>();
1456    TableName tableName = hri.getTable();
1457    SplitTableRegionRequest request = null;
1458    try {
1459      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1460        ng.newNonce());
1461    } catch (DeserializationException e) {
1462      future.completeExceptionally(e);
1463      return future;
1464    }
1465
1466    addListener(
1467      this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
1468        request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1469        new SplitTableRegionProcedureBiConsumer(tableName)),
1470      (ret, err2) -> {
1471        if (err2 != null) {
1472          future.completeExceptionally(err2);
1473        } else {
1474          future.complete(ret);
1475        }
1476      });
1477    return future;
1478  }
1479
1480  @Override
1481  public CompletableFuture<Void> assign(byte[] regionName) {
1482    CompletableFuture<Void> future = new CompletableFuture<>();
1483    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1484      if (err != null) {
1485        future.completeExceptionally(err);
1486        return;
1487      }
1488      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1489        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1490          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1491          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1492        .call(), (ret, err2) -> {
1493          if (err2 != null) {
1494            future.completeExceptionally(err2);
1495          } else {
1496            future.complete(ret);
1497          }
1498        });
1499    });
1500    return future;
1501  }
1502
1503  @Override
1504  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1505    CompletableFuture<Void> future = new CompletableFuture<>();
1506    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1507      if (err != null) {
1508        future.completeExceptionally(err);
1509        return;
1510      }
1511      addListener(
1512        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1513          .action(((controller, stub) -> this
1514            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1515              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1516              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1517          .call(),
1518        (ret, err2) -> {
1519          if (err2 != null) {
1520            future.completeExceptionally(err2);
1521          } else {
1522            future.complete(ret);
1523          }
1524        });
1525    });
1526    return future;
1527  }
1528
1529  @Override
1530  public CompletableFuture<Void> offline(byte[] regionName) {
1531    CompletableFuture<Void> future = new CompletableFuture<>();
1532    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1533      if (err != null) {
1534        future.completeExceptionally(err);
1535        return;
1536      }
1537      addListener(
1538        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1539          .action(((controller, stub) -> this
1540            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1541              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1542              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1543          .call(),
1544        (ret, err2) -> {
1545          if (err2 != null) {
1546            future.completeExceptionally(err2);
1547          } else {
1548            future.complete(ret);
1549          }
1550        });
1551    });
1552    return future;
1553  }
1554
1555  @Override
1556  public CompletableFuture<Void> move(byte[] regionName) {
1557    CompletableFuture<Void> future = new CompletableFuture<>();
1558    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1559      if (err != null) {
1560        future.completeExceptionally(err);
1561        return;
1562      }
1563      addListener(
1564        moveRegion(regionInfo,
1565          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1566        (ret, err2) -> {
1567          if (err2 != null) {
1568            future.completeExceptionally(err2);
1569          } else {
1570            future.complete(ret);
1571          }
1572        });
1573    });
1574    return future;
1575  }
1576
1577  @Override
1578  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1579    Preconditions.checkNotNull(destServerName,
1580      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1581    CompletableFuture<Void> future = new CompletableFuture<>();
1582    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1583      if (err != null) {
1584        future.completeExceptionally(err);
1585        return;
1586      }
1587      addListener(
1588        moveRegion(regionInfo, RequestConverter
1589          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1590        (ret, err2) -> {
1591          if (err2 != null) {
1592            future.completeExceptionally(err2);
1593          } else {
1594            future.complete(ret);
1595          }
1596        });
1597    });
1598    return future;
1599  }
1600
1601  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1602    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1603      .action(
1604        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1605          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1606      .call();
1607  }
1608
1609  @Override
1610  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1611    return this
1612        .<Void> newMasterCaller()
1613        .action(
1614          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1615            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1616            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1617  }
1618
1619  @Override
1620  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1621    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1622    Scan scan = QuotaTableUtil.makeScan(filter);
1623    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1624        .scan(scan, new AdvancedScanResultConsumer() {
1625          List<QuotaSettings> settings = new ArrayList<>();
1626
1627          @Override
1628          public void onNext(Result[] results, ScanController controller) {
1629            for (Result result : results) {
1630              try {
1631                QuotaTableUtil.parseResultToCollection(result, settings);
1632              } catch (IOException e) {
1633                controller.terminate();
1634                future.completeExceptionally(e);
1635              }
1636            }
1637          }
1638
1639          @Override
1640          public void onError(Throwable error) {
1641            future.completeExceptionally(error);
1642          }
1643
1644          @Override
1645          public void onComplete() {
1646            future.complete(settings);
1647          }
1648        });
1649    return future;
1650  }
1651
1652  @Override
1653  public CompletableFuture<Void> addReplicationPeer(String peerId,
1654      ReplicationPeerConfig peerConfig, boolean enabled) {
1655    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1656      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1657      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1658      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1659  }
1660
1661  @Override
1662  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1663    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1664      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1665      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1666      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1667  }
1668
1669  @Override
1670  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1671    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1672      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1673      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1674      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1675  }
1676
1677  @Override
1678  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1679    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1680      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1681      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1682      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1683  }
1684
1685  @Override
1686  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1687    return this
1688        .<ReplicationPeerConfig> newMasterCaller()
1689        .action(
1690          (controller, stub) -> this
1691              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1692                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
1693                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1694                (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1695  }
1696
1697  @Override
1698  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1699      ReplicationPeerConfig peerConfig) {
1700    return this
1701        .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1702          RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1703          (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1704          (resp) -> resp.getProcId(),
1705          new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1706  }
1707
1708  @Override
1709  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1710      Map<TableName, List<String>> tableCfs) {
1711    if (tableCfs == null) {
1712      return failedFuture(new ReplicationException("tableCfs is null"));
1713    }
1714
1715    CompletableFuture<Void> future = new CompletableFuture<Void>();
1716    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1717      if (!completeExceptionally(future, error)) {
1718        ReplicationPeerConfig newPeerConfig =
1719          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1720        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1721          if (!completeExceptionally(future, error)) {
1722            future.complete(result);
1723          }
1724        });
1725      }
1726    });
1727    return future;
1728  }
1729
1730  @Override
1731  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1732      Map<TableName, List<String>> tableCfs) {
1733    if (tableCfs == null) {
1734      return failedFuture(new ReplicationException("tableCfs is null"));
1735    }
1736
1737    CompletableFuture<Void> future = new CompletableFuture<Void>();
1738    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1739      if (!completeExceptionally(future, error)) {
1740        ReplicationPeerConfig newPeerConfig = null;
1741        try {
1742          newPeerConfig = ReplicationPeerConfigUtil
1743            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1744        } catch (ReplicationException e) {
1745          future.completeExceptionally(e);
1746          return;
1747        }
1748        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1749          if (!completeExceptionally(future, error)) {
1750            future.complete(result);
1751          }
1752        });
1753      }
1754    });
1755    return future;
1756  }
1757
1758  @Override
1759  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1760    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1761  }
1762
1763  @Override
1764  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1765    Preconditions.checkNotNull(pattern,
1766      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1767    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1768  }
1769
1770  private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1771      ListReplicationPeersRequest request) {
1772    return this
1773        .<List<ReplicationPeerDescription>> newMasterCaller()
1774        .action(
1775          (controller, stub) -> this
1776              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1777                controller,
1778                stub,
1779                request,
1780                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1781                (resp) -> resp.getPeerDescList().stream()
1782                    .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1783                    .collect(Collectors.toList()))).call();
1784  }
1785
1786  @Override
1787  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1788    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1789    addListener(listTableDescriptors(), (tables, error) -> {
1790      if (!completeExceptionally(future, error)) {
1791        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1792        tables.forEach(table -> {
1793          Map<String, Integer> cfs = new HashMap<>();
1794          Stream.of(table.getColumnFamilies())
1795            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1796            .forEach(column -> {
1797              cfs.put(column.getNameAsString(), column.getScope());
1798            });
1799          if (!cfs.isEmpty()) {
1800            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1801          }
1802        });
1803        future.complete(replicatedTableCFs);
1804      }
1805    });
1806    return future;
1807  }
1808
1809  @Override
1810  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1811    SnapshotProtos.SnapshotDescription snapshot =
1812      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1813    try {
1814      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1815    } catch (IllegalArgumentException e) {
1816      return failedFuture(e);
1817    }
1818    CompletableFuture<Void> future = new CompletableFuture<>();
1819    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1820    addListener(this.<Long> newMasterCaller()
1821      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1822        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1823        resp -> resp.getExpectedTimeout()))
1824      .call(), (expectedTimeout, err) -> {
1825        if (err != null) {
1826          future.completeExceptionally(err);
1827          return;
1828        }
1829        TimerTask pollingTask = new TimerTask() {
1830          int tries = 0;
1831          long startTime = EnvironmentEdgeManager.currentTime();
1832          long endTime = startTime + expectedTimeout;
1833          long maxPauseTime = expectedTimeout / maxAttempts;
1834
1835          @Override
1836          public void run(Timeout timeout) throws Exception {
1837            if (EnvironmentEdgeManager.currentTime() < endTime) {
1838              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1839                if (err2 != null) {
1840                  future.completeExceptionally(err2);
1841                } else if (done) {
1842                  future.complete(null);
1843                } else {
1844                  // retry again after pauseTime.
1845                  long pauseTime =
1846                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1847                  pauseTime = Math.min(pauseTime, maxPauseTime);
1848                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1849                    TimeUnit.MILLISECONDS);
1850                }
1851              });
1852            } else {
1853              future.completeExceptionally(
1854                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1855                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1856            }
1857          }
1858        };
1859        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1860      });
1861    return future;
1862  }
1863
1864  @Override
1865  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1866    return this
1867        .<Boolean> newMasterCaller()
1868        .action(
1869          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1870            controller,
1871            stub,
1872            IsSnapshotDoneRequest.newBuilder()
1873                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1874                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1875  }
1876
1877  @Override
1878  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1879    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1880      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1881      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1882    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1883  }
1884
1885  @Override
1886  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1887      boolean restoreAcl) {
1888    CompletableFuture<Void> future = new CompletableFuture<>();
1889    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1890      if (err != null) {
1891        future.completeExceptionally(err);
1892        return;
1893      }
1894      TableName tableName = null;
1895      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1896        for (SnapshotDescription snap : snapshotDescriptions) {
1897          if (snap.getName().equals(snapshotName)) {
1898            tableName = snap.getTableName();
1899            break;
1900          }
1901        }
1902      }
1903      if (tableName == null) {
1904        future.completeExceptionally(new RestoreSnapshotException(
1905          "Unable to find the table name for snapshot=" + snapshotName));
1906        return;
1907      }
1908      final TableName finalTableName = tableName;
1909      addListener(tableExists(finalTableName), (exists, err2) -> {
1910        if (err2 != null) {
1911          future.completeExceptionally(err2);
1912        } else if (!exists) {
1913          // if table does not exist, then just clone snapshot into new table.
1914          completeConditionalOnFuture(future,
1915            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1916        } else {
1917          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1918            if (err4 != null) {
1919              future.completeExceptionally(err4);
1920            } else if (!disabled) {
1921              future.completeExceptionally(new TableNotDisabledException(finalTableName));
1922            } else {
1923              completeConditionalOnFuture(future,
1924                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1925            }
1926          });
1927        }
1928      });
1929    });
1930    return future;
1931  }
1932
1933  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1934      boolean takeFailSafeSnapshot, boolean restoreAcl) {
1935    if (takeFailSafeSnapshot) {
1936      CompletableFuture<Void> future = new CompletableFuture<>();
1937      // Step.1 Take a snapshot of the current state
1938      String failSafeSnapshotSnapshotNameFormat =
1939        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1940          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1941      final String failSafeSnapshotSnapshotName =
1942        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1943          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1944          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1945      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1946      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1947        if (err != null) {
1948          future.completeExceptionally(err);
1949        } else {
1950          // Step.2 Restore snapshot
1951          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1952            (void2, err2) -> {
1953              if (err2 != null) {
1954                // Step.3.a Something went wrong during the restore and try to rollback.
1955                addListener(
1956                  internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1957                  (void3, err3) -> {
1958                    if (err3 != null) {
1959                      future.completeExceptionally(err3);
1960                    } else {
1961                      String msg =
1962                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1963                          failSafeSnapshotSnapshotName + " succeeded.";
1964                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1965                    }
1966                  });
1967              } else {
1968                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1969                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1970                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1971                  if (err3 != null) {
1972                    LOG.error(
1973                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1974                      err3);
1975                    future.completeExceptionally(err3);
1976                  } else {
1977                    future.complete(ret3);
1978                  }
1979                });
1980              }
1981            });
1982        }
1983      });
1984      return future;
1985    } else {
1986      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
1987    }
1988  }
1989
1990  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
1991      CompletableFuture<T> parentFuture) {
1992    addListener(parentFuture, (res, err) -> {
1993      if (err != null) {
1994        dependentFuture.completeExceptionally(err);
1995      } else {
1996        dependentFuture.complete(res);
1997      }
1998    });
1999  }
2000
2001  @Override
2002  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2003      boolean restoreAcl) {
2004    CompletableFuture<Void> future = new CompletableFuture<>();
2005    addListener(tableExists(tableName), (exists, err) -> {
2006      if (err != null) {
2007        future.completeExceptionally(err);
2008      } else if (exists) {
2009        future.completeExceptionally(new TableExistsException(tableName));
2010      } else {
2011        completeConditionalOnFuture(future,
2012          internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
2013      }
2014    });
2015    return future;
2016  }
2017
2018  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2019      boolean restoreAcl) {
2020    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2021      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2022    try {
2023      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2024    } catch (IllegalArgumentException e) {
2025      return failedFuture(e);
2026    }
2027    return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2028      .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2029        RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2030          .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2031        (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2032      .call());
2033  }
2034
2035  @Override
2036  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2037    return getCompletedSnapshots(null);
2038  }
2039
2040  @Override
2041  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2042    Preconditions.checkNotNull(pattern,
2043      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2044    return getCompletedSnapshots(pattern);
2045  }
2046
2047  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2048    return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2049        .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2050        call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2051          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2052          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2053        .call();
2054  }
2055
2056  @Override
2057  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2058    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2059        + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2060    return getCompletedSnapshots(tableNamePattern, null);
2061  }
2062
2063  @Override
2064  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2065      Pattern snapshotNamePattern) {
2066    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2067        + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2068    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2069        + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2070    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2071  }
2072
2073  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2074      Pattern tableNamePattern, Pattern snapshotNamePattern) {
2075    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2076    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2077      if (err != null) {
2078        future.completeExceptionally(err);
2079        return;
2080      }
2081      if (tableNames == null || tableNames.size() <= 0) {
2082        future.complete(Collections.emptyList());
2083        return;
2084      }
2085      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2086        if (err2 != null) {
2087          future.completeExceptionally(err2);
2088          return;
2089        }
2090        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2091          future.complete(Collections.emptyList());
2092          return;
2093        }
2094        future.complete(snapshotDescList.stream()
2095          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2096          .collect(Collectors.toList()));
2097      });
2098    });
2099    return future;
2100  }
2101
2102  @Override
2103  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2104    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2105  }
2106
2107  @Override
2108  public CompletableFuture<Void> deleteSnapshots() {
2109    return internalDeleteSnapshots(null, null);
2110  }
2111
2112  @Override
2113  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2114    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2115        + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2116    return internalDeleteSnapshots(null, snapshotNamePattern);
2117  }
2118
2119  @Override
2120  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2121    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2122        + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2123    return internalDeleteSnapshots(tableNamePattern, null);
2124  }
2125
2126  @Override
2127  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2128      Pattern snapshotNamePattern) {
2129    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2130        + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2131    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2132        + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2133    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2134  }
2135
2136  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2137      Pattern snapshotNamePattern) {
2138    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2139    if (tableNamePattern == null) {
2140      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2141    } else {
2142      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2143    }
2144    CompletableFuture<Void> future = new CompletableFuture<>();
2145    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2146      if (err != null) {
2147        future.completeExceptionally(err);
2148        return;
2149      }
2150      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2151        future.complete(null);
2152        return;
2153      }
2154      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2155        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2156          if (e != null) {
2157            future.completeExceptionally(e);
2158          } else {
2159            future.complete(v);
2160          }
2161        });
2162    }));
2163    return future;
2164  }
2165
2166  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2167    return this
2168        .<Void> newMasterCaller()
2169        .action(
2170          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2171            controller,
2172            stub,
2173            DeleteSnapshotRequest.newBuilder()
2174                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2175                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2176  }
2177
2178  @Override
2179  public CompletableFuture<Void> execProcedure(String signature, String instance,
2180      Map<String, String> props) {
2181    CompletableFuture<Void> future = new CompletableFuture<>();
2182    ProcedureDescription procDesc =
2183      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2184    addListener(this.<Long> newMasterCaller()
2185      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2186        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2187        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2188      .call(), (expectedTimeout, err) -> {
2189        if (err != null) {
2190          future.completeExceptionally(err);
2191          return;
2192        }
2193        TimerTask pollingTask = new TimerTask() {
2194          int tries = 0;
2195          long startTime = EnvironmentEdgeManager.currentTime();
2196          long endTime = startTime + expectedTimeout;
2197          long maxPauseTime = expectedTimeout / maxAttempts;
2198
2199          @Override
2200          public void run(Timeout timeout) throws Exception {
2201            if (EnvironmentEdgeManager.currentTime() < endTime) {
2202              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2203                if (err2 != null) {
2204                  future.completeExceptionally(err2);
2205                  return;
2206                }
2207                if (done) {
2208                  future.complete(null);
2209                } else {
2210                  // retry again after pauseTime.
2211                  long pauseTime =
2212                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2213                  pauseTime = Math.min(pauseTime, maxPauseTime);
2214                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2215                    TimeUnit.MICROSECONDS);
2216                }
2217              });
2218            } else {
2219              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2220                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2221            }
2222          }
2223        };
2224        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2225        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2226      });
2227    return future;
2228  }
2229
2230  @Override
2231  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2232      Map<String, String> props) {
2233    ProcedureDescription proDesc =
2234        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2235    return this.<byte[]> newMasterCaller()
2236        .action(
2237          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2238            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2239            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2240            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2241        .call();
2242  }
2243
2244  @Override
2245  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2246      Map<String, String> props) {
2247    ProcedureDescription proDesc =
2248        ProtobufUtil.buildProcedureDescription(signature, instance, props);
2249    return this.<Boolean> newMasterCaller()
2250        .action((controller, stub) -> this
2251            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2252              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2253              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2254        .call();
2255  }
2256
2257  @Override
2258  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2259    return this.<Boolean> newMasterCaller().action(
2260      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2261        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2262        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2263        .call();
2264  }
2265
2266  @Override
2267  public CompletableFuture<String> getProcedures() {
2268    return this
2269        .<String> newMasterCaller()
2270        .action(
2271          (controller, stub) -> this
2272              .<GetProceduresRequest, GetProceduresResponse, String> call(
2273                controller, stub, GetProceduresRequest.newBuilder().build(),
2274                (s, c, req, done) -> s.getProcedures(c, req, done),
2275                resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2276  }
2277
2278  @Override
2279  public CompletableFuture<String> getLocks() {
2280    return this
2281        .<String> newMasterCaller()
2282        .action(
2283          (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2284            controller, stub, GetLocksRequest.newBuilder().build(),
2285            (s, c, req, done) -> s.getLocks(c, req, done),
2286            resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2287  }
2288
2289  @Override
2290  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2291    return this.<Void> newMasterCaller()
2292        .action((controller, stub) -> this
2293          .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2294            controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2295            (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2296        .call();
2297  }
2298
2299  @Override
2300  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2301    return this.<List<ServerName>> newMasterCaller()
2302        .action((controller, stub) -> this
2303          .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2304            List<ServerName>> call(
2305              controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2306              (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2307              resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2308                  .collect(Collectors.toList())))
2309        .call();
2310  }
2311
2312  @Override
2313  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2314      List<byte[]> encodedRegionNames) {
2315    return this.<Void> newMasterCaller()
2316        .action((controller, stub) -> this
2317          .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2318            stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2319            (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2320        .call();
2321  }
2322
2323  /**
2324   * Get the region location for the passed region name. The region name may be a full region name
2325   * or encoded region name. If the region does not found, then it'll throw an
2326   * UnknownRegionException wrapped by a {@link CompletableFuture}
2327   * @param regionNameOrEncodedRegionName region name or encoded region name
2328   * @return region location, wrapped by a {@link CompletableFuture}
2329   */
2330  @VisibleForTesting
2331  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2332    if (regionNameOrEncodedRegionName == null) {
2333      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2334    }
2335    try {
2336      CompletableFuture<Optional<HRegionLocation>> future;
2337      if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2338        String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2339        if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2340          // old format encodedName, should be meta region
2341          future = connection.registry.getMetaRegionLocation()
2342            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2343              .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2344        } else {
2345          future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2346            regionNameOrEncodedRegionName);
2347        }
2348      } else {
2349        RegionInfo regionInfo =
2350          MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2351        if (regionInfo.isMetaRegion()) {
2352          future = connection.registry.getMetaRegionLocation()
2353            .thenApply(locs -> Stream.of(locs.getRegionLocations())
2354              .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2355              .findFirst());
2356        } else {
2357          future =
2358            AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2359        }
2360      }
2361
2362      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2363      addListener(future, (location, err) -> {
2364        if (err != null) {
2365          returnedFuture.completeExceptionally(err);
2366          return;
2367        }
2368        if (!location.isPresent() || location.get().getRegion() == null) {
2369          returnedFuture.completeExceptionally(
2370            new UnknownRegionException("Invalid region name or encoded region name: " +
2371              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2372        } else {
2373          returnedFuture.complete(location.get());
2374        }
2375      });
2376      return returnedFuture;
2377    } catch (IOException e) {
2378      return failedFuture(e);
2379    }
2380  }
2381
2382  /**
2383   * Get the region info for the passed region name. The region name may be a full region name or
2384   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2385   * wrapped by a {@link CompletableFuture}
2386   * @param regionNameOrEncodedRegionName
2387   * @return region info, wrapped by a {@link CompletableFuture}
2388   */
2389  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2390    if (regionNameOrEncodedRegionName == null) {
2391      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2392    }
2393
2394    if (Bytes.equals(regionNameOrEncodedRegionName,
2395      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2396      Bytes.equals(regionNameOrEncodedRegionName,
2397        RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2398      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2399    }
2400
2401    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2402    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2403      if (err != null) {
2404        future.completeExceptionally(err);
2405      } else {
2406        future.complete(location.getRegion());
2407      }
2408    });
2409    return future;
2410  }
2411
2412  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2413    if (numRegions < 3) {
2414      throw new IllegalArgumentException("Must create at least three regions");
2415    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2416      throw new IllegalArgumentException("Start key must be smaller than end key");
2417    }
2418    if (numRegions == 3) {
2419      return new byte[][] { startKey, endKey };
2420    }
2421    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2422    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2423      throw new IllegalArgumentException("Unable to split key range into enough regions");
2424    }
2425    return splitKeys;
2426  }
2427
2428  private void verifySplitKeys(byte[][] splitKeys) {
2429    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2430    // Verify there are no duplicate split keys
2431    byte[] lastKey = null;
2432    for (byte[] splitKey : splitKeys) {
2433      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2434        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2435      }
2436      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2437        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2438            + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2439      }
2440      lastKey = splitKey;
2441    }
2442  }
2443
2444  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2445
2446    abstract void onFinished();
2447
2448    abstract void onError(Throwable error);
2449
2450    @Override
2451    public void accept(Void v, Throwable error) {
2452      if (error != null) {
2453        onError(error);
2454        return;
2455      }
2456      onFinished();
2457    }
2458  }
2459
2460  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2461    protected final TableName tableName;
2462
2463    TableProcedureBiConsumer(TableName tableName) {
2464      this.tableName = tableName;
2465    }
2466
2467    abstract String getOperationType();
2468
2469    String getDescription() {
2470      return "Operation: " + getOperationType() + ", " + "Table Name: "
2471          + tableName.getNameWithNamespaceInclAsString();
2472    }
2473
2474    @Override
2475    void onFinished() {
2476      LOG.info(getDescription() + " completed");
2477    }
2478
2479    @Override
2480    void onError(Throwable error) {
2481      LOG.info(getDescription() + " failed with " + error.getMessage());
2482    }
2483  }
2484
2485  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2486    protected final String namespaceName;
2487
2488    NamespaceProcedureBiConsumer(String namespaceName) {
2489      this.namespaceName = namespaceName;
2490    }
2491
2492    abstract String getOperationType();
2493
2494    String getDescription() {
2495      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2496    }
2497
2498    @Override
2499    void onFinished() {
2500      LOG.info(getDescription() + " completed");
2501    }
2502
2503    @Override
2504    void onError(Throwable error) {
2505      LOG.info(getDescription() + " failed with " + error.getMessage());
2506    }
2507  }
2508
2509  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2510
2511    CreateTableProcedureBiConsumer(TableName tableName) {
2512      super(tableName);
2513    }
2514
2515    @Override
2516    String getOperationType() {
2517      return "CREATE";
2518    }
2519  }
2520
2521  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2522
2523    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2524      super(tableName);
2525    }
2526
2527    @Override
2528    String getOperationType() {
2529      return "ENABLE";
2530    }
2531  }
2532
2533  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2534
2535    DeleteTableProcedureBiConsumer(TableName tableName) {
2536      super(tableName);
2537    }
2538
2539    @Override
2540    String getOperationType() {
2541      return "DELETE";
2542    }
2543
2544    @Override
2545    void onFinished() {
2546      connection.getLocator().clearCache(this.tableName);
2547      super.onFinished();
2548    }
2549  }
2550
2551  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2552
2553    TruncateTableProcedureBiConsumer(TableName tableName) {
2554      super(tableName);
2555    }
2556
2557    @Override
2558    String getOperationType() {
2559      return "TRUNCATE";
2560    }
2561  }
2562
2563  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2564
2565    EnableTableProcedureBiConsumer(TableName tableName) {
2566      super(tableName);
2567    }
2568
2569    @Override
2570    String getOperationType() {
2571      return "ENABLE";
2572    }
2573  }
2574
2575  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2576
2577    DisableTableProcedureBiConsumer(TableName tableName) {
2578      super(tableName);
2579    }
2580
2581    @Override
2582    String getOperationType() {
2583      return "DISABLE";
2584    }
2585  }
2586
2587  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2588
2589    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2590      super(tableName);
2591    }
2592
2593    @Override
2594    String getOperationType() {
2595      return "ADD_COLUMN_FAMILY";
2596    }
2597  }
2598
2599  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2600
2601    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2602      super(tableName);
2603    }
2604
2605    @Override
2606    String getOperationType() {
2607      return "DELETE_COLUMN_FAMILY";
2608    }
2609  }
2610
2611  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2612
2613    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2614      super(tableName);
2615    }
2616
2617    @Override
2618    String getOperationType() {
2619      return "MODIFY_COLUMN_FAMILY";
2620    }
2621  }
2622
2623  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2624
2625    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2626      super(namespaceName);
2627    }
2628
2629    @Override
2630    String getOperationType() {
2631      return "CREATE_NAMESPACE";
2632    }
2633  }
2634
2635  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2636
2637    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2638      super(namespaceName);
2639    }
2640
2641    @Override
2642    String getOperationType() {
2643      return "DELETE_NAMESPACE";
2644    }
2645  }
2646
2647  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2648
2649    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2650      super(namespaceName);
2651    }
2652
2653    @Override
2654    String getOperationType() {
2655      return "MODIFY_NAMESPACE";
2656    }
2657  }
2658
2659  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2660
2661    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2662      super(tableName);
2663    }
2664
2665    @Override
2666    String getOperationType() {
2667      return "MERGE_REGIONS";
2668    }
2669  }
2670
2671  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2672
2673    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2674      super(tableName);
2675    }
2676
2677    @Override
2678    String getOperationType() {
2679      return "SPLIT_REGION";
2680    }
2681  }
2682
2683  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2684    private final String peerId;
2685    private final Supplier<String> getOperation;
2686
2687    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2688      this.peerId = peerId;
2689      this.getOperation = getOperation;
2690    }
2691
2692    String getDescription() {
2693      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2694    }
2695
2696    @Override
2697    void onFinished() {
2698      LOG.info(getDescription() + " completed");
2699    }
2700
2701    @Override
2702    void onError(Throwable error) {
2703      LOG.info(getDescription() + " failed with " + error.getMessage());
2704    }
2705  }
2706
2707  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2708    CompletableFuture<Void> future = new CompletableFuture<>();
2709    addListener(procFuture, (procId, error) -> {
2710      if (error != null) {
2711        future.completeExceptionally(error);
2712        return;
2713      }
2714      getProcedureResult(procId, future, 0);
2715    });
2716    return future;
2717  }
2718
2719  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2720    addListener(
2721      this.<GetProcedureResultResponse> newMasterCaller()
2722        .action((controller, stub) -> this
2723          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2724            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2725            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2726        .call(),
2727      (response, error) -> {
2728        if (error != null) {
2729          LOG.warn("failed to get the procedure result procId={}", procId,
2730            ConnectionUtils.translateException(error));
2731          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2732            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2733          return;
2734        }
2735        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2736          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2737            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2738          return;
2739        }
2740        if (response.hasException()) {
2741          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2742          future.completeExceptionally(ioe);
2743        } else {
2744          future.complete(null);
2745        }
2746      });
2747  }
2748
2749  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2750    CompletableFuture<T> future = new CompletableFuture<>();
2751    future.completeExceptionally(error);
2752    return future;
2753  }
2754
2755  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2756    if (error != null) {
2757      future.completeExceptionally(error);
2758      return true;
2759    }
2760    return false;
2761  }
2762
2763  @Override
2764  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2765    return getClusterMetrics(EnumSet.allOf(Option.class));
2766  }
2767
2768  @Override
2769  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2770    return this
2771        .<ClusterMetrics> newMasterCaller()
2772        .action(
2773          (controller, stub) -> this
2774              .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2775                stub, RequestConverter.buildGetClusterStatusRequest(options),
2776                (s, c, req, done) -> s.getClusterStatus(c, req, done),
2777                resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2778  }
2779
2780  @Override
2781  public CompletableFuture<Void> shutdown() {
2782    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2783      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2784        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2785        resp -> null))
2786      .call();
2787  }
2788
2789  @Override
2790  public CompletableFuture<Void> stopMaster() {
2791    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2792      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2793        controller, stub, StopMasterRequest.newBuilder().build(),
2794        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2795      .call();
2796  }
2797
2798  @Override
2799  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2800    StopServerRequest request = RequestConverter
2801      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2802    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2803      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2804        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2805        resp -> null))
2806      .serverName(serverName).call();
2807  }
2808
2809  @Override
2810  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2811    return this
2812        .<Void> newAdminCaller()
2813        .action(
2814          (controller, stub) -> this
2815              .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2816                controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2817                (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2818        .serverName(serverName).call();
2819  }
2820
2821  @Override
2822  public CompletableFuture<Void> updateConfiguration() {
2823    CompletableFuture<Void> future = new CompletableFuture<Void>();
2824    addListener(
2825      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2826      (status, err) -> {
2827        if (err != null) {
2828          future.completeExceptionally(err);
2829        } else {
2830          List<CompletableFuture<Void>> futures = new ArrayList<>();
2831          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2832          futures.add(updateConfiguration(status.getMasterName()));
2833          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2834          addListener(
2835            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2836            (result, err2) -> {
2837              if (err2 != null) {
2838                future.completeExceptionally(err2);
2839              } else {
2840                future.complete(result);
2841              }
2842            });
2843        }
2844      });
2845    return future;
2846  }
2847
2848  @Override
2849  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2850    return this
2851        .<Void> newAdminCaller()
2852        .action(
2853          (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2854            controller, stub, RequestConverter.buildRollWALWriterRequest(),
2855            (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2856        .serverName(serverName).call();
2857  }
2858
2859  @Override
2860  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2861    return this
2862        .<Void> newAdminCaller()
2863        .action(
2864          (controller, stub) -> this
2865              .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2866                controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2867                    c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2868        .serverName(serverName).call();
2869  }
2870
2871  @Override
2872  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2873    return this
2874        .<List<SecurityCapability>> newMasterCaller()
2875        .action(
2876          (controller, stub) -> this
2877              .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2878                controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2879                    done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2880                    .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2881  }
2882
2883  @Override
2884  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2885    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2886  }
2887
2888  @Override
2889  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2890      TableName tableName) {
2891    Preconditions.checkNotNull(tableName,
2892      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2893    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2894  }
2895
2896  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2897      ServerName serverName) {
2898    return this.<List<RegionMetrics>> newAdminCaller()
2899        .action((controller, stub) -> this
2900            .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2901              adminCall(controller, stub, request, (s, c, req, done) ->
2902                s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2903        .serverName(serverName).call();
2904  }
2905
2906  @Override
2907  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2908    return this
2909        .<Boolean> newMasterCaller()
2910        .action(
2911          (controller, stub) -> this
2912              .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2913                stub, IsInMaintenanceModeRequest.newBuilder().build(),
2914                (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2915                resp -> resp.getInMaintenanceMode())).call();
2916  }
2917
2918  @Override
2919  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2920      CompactType compactType) {
2921    CompletableFuture<CompactionState> future = new CompletableFuture<>();
2922
2923    switch (compactType) {
2924      case MOB:
2925        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2926          if (err != null) {
2927            future.completeExceptionally(err);
2928            return;
2929          }
2930          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2931
2932          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2933            .action((controller, stub) -> this
2934              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2935                controller, stub,
2936                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2937                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2938            .call(), (resp2, err2) -> {
2939              if (err2 != null) {
2940                future.completeExceptionally(err2);
2941              } else {
2942                if (resp2.hasCompactionState()) {
2943                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2944                } else {
2945                  future.complete(CompactionState.NONE);
2946                }
2947              }
2948            });
2949        });
2950        break;
2951      case NORMAL:
2952        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2953          if (err != null) {
2954            future.completeExceptionally(err);
2955            return;
2956          }
2957          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
2958          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2959          locations.stream().filter(loc -> loc.getServerName() != null)
2960            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2961            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2962              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2963                // If any region compaction state is MAJOR_AND_MINOR
2964                // the table compaction state is MAJOR_AND_MINOR, too.
2965                if (err2 != null) {
2966                  future.completeExceptionally(unwrapCompletionException(err2));
2967                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2968                  future.complete(regionState);
2969                } else {
2970                  regionStates.add(regionState);
2971                }
2972              }));
2973            });
2974          addListener(
2975            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2976            (ret, err3) -> {
2977              // If future not completed, check all regions's compaction state
2978              if (!future.isCompletedExceptionally() && !future.isDone()) {
2979                CompactionState state = CompactionState.NONE;
2980                for (CompactionState regionState : regionStates) {
2981                  switch (regionState) {
2982                    case MAJOR:
2983                      if (state == CompactionState.MINOR) {
2984                        future.complete(CompactionState.MAJOR_AND_MINOR);
2985                      } else {
2986                        state = CompactionState.MAJOR;
2987                      }
2988                      break;
2989                    case MINOR:
2990                      if (state == CompactionState.MAJOR) {
2991                        future.complete(CompactionState.MAJOR_AND_MINOR);
2992                      } else {
2993                        state = CompactionState.MINOR;
2994                      }
2995                      break;
2996                    case NONE:
2997                    default:
2998                  }
2999                }
3000                if (!future.isDone()) {
3001                  future.complete(state);
3002                }
3003              }
3004            });
3005        });
3006        break;
3007      default:
3008        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3009    }
3010
3011    return future;
3012  }
3013
3014  @Override
3015  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3016    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3017    addListener(getRegionLocation(regionName), (location, err) -> {
3018      if (err != null) {
3019        future.completeExceptionally(err);
3020        return;
3021      }
3022      ServerName serverName = location.getServerName();
3023      if (serverName == null) {
3024        future
3025          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3026        return;
3027      }
3028      addListener(
3029        this.<GetRegionInfoResponse> newAdminCaller()
3030          .action((controller, stub) -> this
3031            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3032              controller, stub,
3033              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3034                true),
3035              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3036          .serverName(serverName).call(),
3037        (resp2, err2) -> {
3038          if (err2 != null) {
3039            future.completeExceptionally(err2);
3040          } else {
3041            if (resp2.hasCompactionState()) {
3042              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3043            } else {
3044              future.complete(CompactionState.NONE);
3045            }
3046          }
3047        });
3048    });
3049    return future;
3050  }
3051
3052  @Override
3053  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3054    MajorCompactionTimestampRequest request =
3055        MajorCompactionTimestampRequest.newBuilder()
3056            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3057    return this
3058        .<Optional<Long>> newMasterCaller()
3059        .action(
3060          (controller, stub) -> this
3061              .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
3062                controller, stub, request,
3063                (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3064                ProtobufUtil::toOptionalTimestamp)).call();
3065  }
3066
3067  @Override
3068  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3069      byte[] regionName) {
3070    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3071    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3072    addListener(getRegionInfo(regionName), (region, err) -> {
3073      if (err != null) {
3074        future.completeExceptionally(err);
3075        return;
3076      }
3077      MajorCompactionTimestampForRegionRequest.Builder builder =
3078        MajorCompactionTimestampForRegionRequest.newBuilder();
3079      builder.setRegion(
3080        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3081      addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3082        .<MajorCompactionTimestampForRegionRequest,
3083        MajorCompactionTimestampResponse, Optional<Long>> call(
3084          controller, stub, builder.build(),
3085          (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3086          ProtobufUtil::toOptionalTimestamp))
3087        .call(), (timestamp, err2) -> {
3088          if (err2 != null) {
3089            future.completeExceptionally(err2);
3090          } else {
3091            future.complete(timestamp);
3092          }
3093        });
3094    });
3095    return future;
3096  }
3097
3098  @Override
3099  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3100      List<String> serverNamesList) {
3101    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3102    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3103      if (err != null) {
3104        future.completeExceptionally(err);
3105        return;
3106      }
3107      // Accessed by multiple threads.
3108      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3109      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3110      serverNames.stream().forEach(serverName -> {
3111        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3112          if (err2 != null) {
3113            future.completeExceptionally(unwrapCompletionException(err2));
3114          } else {
3115            serverStates.put(serverName, serverState);
3116          }
3117        }));
3118      });
3119      addListener(
3120        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3121        (ret, err3) -> {
3122          if (!future.isCompletedExceptionally()) {
3123            if (err3 != null) {
3124              future.completeExceptionally(err3);
3125            } else {
3126              future.complete(serverStates);
3127            }
3128          }
3129        });
3130    });
3131    return future;
3132  }
3133
3134  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3135    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3136    if (serverNamesList.isEmpty()) {
3137      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3138        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3139      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3140        if (err != null) {
3141          future.completeExceptionally(err);
3142        } else {
3143          future.complete(clusterMetrics.getServersName());
3144        }
3145      });
3146      return future;
3147    } else {
3148      List<ServerName> serverList = new ArrayList<>();
3149      for (String regionServerName : serverNamesList) {
3150        ServerName serverName = null;
3151        try {
3152          serverName = ServerName.valueOf(regionServerName);
3153        } catch (Exception e) {
3154          future.completeExceptionally(
3155            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3156        }
3157        if (serverName == null) {
3158          future.completeExceptionally(
3159            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3160        } else {
3161          serverList.add(serverName);
3162        }
3163      }
3164      future.complete(serverList);
3165    }
3166    return future;
3167  }
3168
3169  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3170    return this
3171        .<Boolean>newAdminCaller()
3172        .serverName(serverName)
3173        .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3174            Boolean>adminCall(controller, stub,
3175            CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3176            s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3177  }
3178
3179  @Override
3180  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3181    return this.<Boolean> newMasterCaller()
3182      .action((controller, stub) -> this
3183        .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3184          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3185          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3186          (resp) -> resp.getPrevBalanceValue()))
3187      .call();
3188  }
3189
3190  @Override
3191  public CompletableFuture<Boolean> balance(boolean forcible) {
3192    return this
3193        .<Boolean> newMasterCaller()
3194        .action(
3195          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3196            stub, RequestConverter.buildBalanceRequest(forcible),
3197            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3198  }
3199
3200  @Override
3201  public CompletableFuture<Boolean> isBalancerEnabled() {
3202    return this
3203        .<Boolean> newMasterCaller()
3204        .action(
3205          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3206            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3207            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3208        .call();
3209  }
3210
3211  @Override
3212  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3213    return this
3214        .<Boolean> newMasterCaller()
3215        .action(
3216          (controller, stub) -> this
3217              .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3218                controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3219                    req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3220                    .getPrevNormalizerValue())).call();
3221  }
3222
3223  @Override
3224  public CompletableFuture<Boolean> isNormalizerEnabled() {
3225    return this
3226        .<Boolean> newMasterCaller()
3227        .action(
3228          (controller, stub) -> this
3229              .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3230                stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3231                (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3232                (resp) -> resp.getEnabled())).call();
3233  }
3234
3235  @Override
3236  public CompletableFuture<Boolean> normalize() {
3237    return this
3238        .<Boolean> newMasterCaller()
3239        .action(
3240          (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3241            controller, stub, RequestConverter.buildNormalizeRequest(),
3242            (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3243        .call();
3244  }
3245
3246  @Override
3247  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3248    return this
3249        .<Boolean> newMasterCaller()
3250        .action(
3251          (controller, stub) -> this
3252              .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3253                controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3254                    c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3255                    .getPrevValue())).call();
3256  }
3257
3258  @Override
3259  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3260    return this
3261        .<Boolean> newMasterCaller()
3262        .action(
3263          (controller, stub) -> this
3264              .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3265                controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3266                    done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3267        .call();
3268  }
3269
3270  @Override
3271  public CompletableFuture<Boolean> runCleanerChore() {
3272    return this
3273        .<Boolean> newMasterCaller()
3274        .action(
3275          (controller, stub) -> this
3276              .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3277                RequestConverter.buildRunCleanerChoreRequest(),
3278                (s, c, req, done) -> s.runCleanerChore(c, req, done),
3279                (resp) -> resp.getCleanerChoreRan())).call();
3280  }
3281
3282  @Override
3283  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3284    return this
3285        .<Boolean> newMasterCaller()
3286        .action(
3287          (controller, stub) -> this
3288              .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3289                controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3290                    c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3291                    .getPrevValue())).call();
3292  }
3293
3294  @Override
3295  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3296    return this
3297        .<Boolean> newMasterCaller()
3298        .action(
3299          (controller, stub) -> this
3300              .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3301                controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3302                    req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3303                    .getValue())).call();
3304  }
3305
3306  @Override
3307  public CompletableFuture<Integer> runCatalogJanitor() {
3308    return this
3309        .<Integer> newMasterCaller()
3310        .action(
3311          (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3312            controller, stub, RequestConverter.buildCatalogScanRequest(),
3313            (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3314        .call();
3315  }
3316
3317  @Override
3318  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3319      ServiceCaller<S, R> callable) {
3320    MasterCoprocessorRpcChannelImpl channel =
3321        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3322    S stub = stubMaker.apply(channel);
3323    CompletableFuture<R> future = new CompletableFuture<>();
3324    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3325    callable.call(stub, controller, resp -> {
3326      if (controller.failed()) {
3327        future.completeExceptionally(controller.getFailed());
3328      } else {
3329        future.complete(resp);
3330      }
3331    });
3332    return future;
3333  }
3334
3335  @Override
3336  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3337      ServiceCaller<S, R> callable, ServerName serverName) {
3338    RegionServerCoprocessorRpcChannelImpl channel =
3339        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3340          serverName));
3341    S stub = stubMaker.apply(channel);
3342    CompletableFuture<R> future = new CompletableFuture<>();
3343    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3344    callable.call(stub, controller, resp -> {
3345      if (controller.failed()) {
3346        future.completeExceptionally(controller.getFailed());
3347      } else {
3348        future.complete(resp);
3349      }
3350    });
3351    return future;
3352  }
3353
3354  @Override
3355  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3356    return this.<List<ServerName>> newMasterCaller()
3357      .action((controller, stub) -> this
3358        .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3359          controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3360          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3361          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3362      .call();
3363  }
3364
3365  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3366    return this.connection.callerFactory.<T> serverRequest()
3367      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3368      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3369      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3370      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3371  }
3372
3373  @Override
3374  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3375    if (tableName == null) {
3376      return failedFuture(new IllegalArgumentException("Table name is null"));
3377    }
3378    CompletableFuture<Void> future = new CompletableFuture<>();
3379    addListener(tableExists(tableName), (exist, err) -> {
3380      if (err != null) {
3381        future.completeExceptionally(err);
3382        return;
3383      }
3384      if (!exist) {
3385        future.completeExceptionally(new TableNotFoundException(
3386          "Table '" + tableName.getNameAsString() + "' does not exists."));
3387        return;
3388      }
3389      addListener(getTableSplits(tableName), (splits, err1) -> {
3390        if (err1 != null) {
3391          future.completeExceptionally(err1);
3392        } else {
3393          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3394            if (err2 != null) {
3395              future.completeExceptionally(err2);
3396            } else {
3397              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3398                if (err3 != null) {
3399                  future.completeExceptionally(err3);
3400                } else {
3401                  future.complete(result3);
3402                }
3403              });
3404            }
3405          });
3406        }
3407      });
3408    });
3409    return future;
3410  }
3411
3412  @Override
3413  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3414    if (tableName == null) {
3415      return failedFuture(new IllegalArgumentException("Table name is null"));
3416    }
3417    CompletableFuture<Void> future = new CompletableFuture<>();
3418    addListener(tableExists(tableName), (exist, err) -> {
3419      if (err != null) {
3420        future.completeExceptionally(err);
3421        return;
3422      }
3423      if (!exist) {
3424        future.completeExceptionally(new TableNotFoundException(
3425          "Table '" + tableName.getNameAsString() + "' does not exists."));
3426        return;
3427      }
3428      addListener(setTableReplication(tableName, false), (result, err2) -> {
3429        if (err2 != null) {
3430          future.completeExceptionally(err2);
3431        } else {
3432          future.complete(result);
3433        }
3434      });
3435    });
3436    return future;
3437  }
3438
3439  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3440    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3441    addListener(
3442      getRegions(tableName).thenApply(regions -> regions.stream()
3443        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3444      (regions, err2) -> {
3445        if (err2 != null) {
3446          future.completeExceptionally(err2);
3447          return;
3448        }
3449        if (regions.size() == 1) {
3450          future.complete(null);
3451        } else {
3452          byte[][] splits = new byte[regions.size() - 1][];
3453          for (int i = 1; i < regions.size(); i++) {
3454            splits[i - 1] = regions.get(i).getStartKey();
3455          }
3456          future.complete(splits);
3457        }
3458      });
3459    return future;
3460  }
3461
3462  /**
3463   * Connect to peer and check the table descriptor on peer:
3464   * <ol>
3465   * <li>Create the same table on peer when not exist.</li>
3466   * <li>Throw an exception if the table already has replication enabled on any of the column
3467   * families.</li>
3468   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3469   * </ol>
3470   * @param tableName name of the table to sync to the peer
3471   * @param splits table split keys
3472   */
3473  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3474      byte[][] splits) {
3475    CompletableFuture<Void> future = new CompletableFuture<>();
3476    addListener(listReplicationPeers(), (peers, err) -> {
3477      if (err != null) {
3478        future.completeExceptionally(err);
3479        return;
3480      }
3481      if (peers == null || peers.size() <= 0) {
3482        future.completeExceptionally(
3483          new IllegalArgumentException("Found no peer cluster for replication."));
3484        return;
3485      }
3486      List<CompletableFuture<Void>> futures = new ArrayList<>();
3487      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3488        .forEach(peer -> {
3489          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3490        });
3491      addListener(
3492        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3493        (result, err2) -> {
3494          if (err2 != null) {
3495            future.completeExceptionally(err2);
3496          } else {
3497            future.complete(result);
3498          }
3499        });
3500    });
3501    return future;
3502  }
3503
3504  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3505      ReplicationPeerDescription peer) {
3506    Configuration peerConf = null;
3507    try {
3508      peerConf =
3509        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3510    } catch (IOException e) {
3511      return failedFuture(e);
3512    }
3513    CompletableFuture<Void> future = new CompletableFuture<>();
3514    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3515      if (err != null) {
3516        future.completeExceptionally(err);
3517        return;
3518      }
3519      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3520        if (err1 != null) {
3521          future.completeExceptionally(err1);
3522          return;
3523        }
3524        AsyncAdmin peerAdmin = conn.getAdmin();
3525        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3526          if (err2 != null) {
3527            future.completeExceptionally(err2);
3528            return;
3529          }
3530          if (!exist) {
3531            CompletableFuture<Void> createTableFuture = null;
3532            if (splits == null) {
3533              createTableFuture = peerAdmin.createTable(tableDesc);
3534            } else {
3535              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3536            }
3537            addListener(createTableFuture, (result, err3) -> {
3538              if (err3 != null) {
3539                future.completeExceptionally(err3);
3540              } else {
3541                future.complete(result);
3542              }
3543            });
3544          } else {
3545            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3546              (result, err4) -> {
3547                if (err4 != null) {
3548                  future.completeExceptionally(err4);
3549                } else {
3550                  future.complete(result);
3551                }
3552              });
3553          }
3554        });
3555      });
3556    });
3557    return future;
3558  }
3559
3560  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3561      TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3562    CompletableFuture<Void> future = new CompletableFuture<>();
3563    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3564      if (err != null) {
3565        future.completeExceptionally(err);
3566        return;
3567      }
3568      if (peerTableDesc == null) {
3569        future.completeExceptionally(
3570          new IllegalArgumentException("Failed to get table descriptor for table " +
3571            tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3572        return;
3573      }
3574      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3575        future.completeExceptionally(new IllegalArgumentException(
3576          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3577            ", but the table descriptors are not same when compared with source cluster." +
3578            " Thus can not enable the table's replication switch."));
3579        return;
3580      }
3581      future.complete(null);
3582    });
3583    return future;
3584  }
3585
3586  /**
3587   * Set the table's replication switch if the table's replication switch is already not set.
3588   * @param tableName name of the table
3589   * @param enableRep is replication switch enable or disable
3590   */
3591  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3592    CompletableFuture<Void> future = new CompletableFuture<>();
3593    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3594      if (err != null) {
3595        future.completeExceptionally(err);
3596        return;
3597      }
3598      if (!tableDesc.matchReplicationScope(enableRep)) {
3599        int scope =
3600          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3601        TableDescriptor newTableDesc =
3602          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3603        addListener(modifyTable(newTableDesc), (result, err2) -> {
3604          if (err2 != null) {
3605            future.completeExceptionally(err2);
3606          } else {
3607            future.complete(result);
3608          }
3609        });
3610      } else {
3611        future.complete(null);
3612      }
3613    });
3614    return future;
3615  }
3616
3617  @Override
3618  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3619    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3620    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3621      if (err != null) {
3622        future.completeExceptionally(err);
3623        return;
3624      }
3625      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3626        locations.stream().filter(l -> l.getRegion() != null)
3627          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3628          .collect(Collectors.groupingBy(l -> l.getServerName(),
3629            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3630      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3631      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3632      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3633        futures
3634          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3635            if (err2 != null) {
3636              future.completeExceptionally(unwrapCompletionException(err2));
3637            } else {
3638              aggregator.append(stats);
3639            }
3640          }));
3641      }
3642      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3643        (ret, err3) -> {
3644          if (err3 != null) {
3645            future.completeExceptionally(unwrapCompletionException(err3));
3646          } else {
3647            future.complete(aggregator.sum());
3648          }
3649        });
3650    });
3651    return future;
3652  }
3653
3654  @Override
3655  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3656      boolean preserveSplits) {
3657    CompletableFuture<Void> future = new CompletableFuture<>();
3658    addListener(tableExists(tableName), (exist, err) -> {
3659      if (err != null) {
3660        future.completeExceptionally(err);
3661        return;
3662      }
3663      if (!exist) {
3664        future.completeExceptionally(new TableNotFoundException(tableName));
3665        return;
3666      }
3667      addListener(tableExists(newTableName), (exist1, err1) -> {
3668        if (err1 != null) {
3669          future.completeExceptionally(err1);
3670          return;
3671        }
3672        if (exist1) {
3673          future.completeExceptionally(new TableExistsException(newTableName));
3674          return;
3675        }
3676        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3677          if (err2 != null) {
3678            future.completeExceptionally(err2);
3679            return;
3680          }
3681          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3682          if (preserveSplits) {
3683            addListener(getTableSplits(tableName), (splits, err3) -> {
3684              if (err3 != null) {
3685                future.completeExceptionally(err3);
3686              } else {
3687                addListener(
3688                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3689                  (result, err4) -> {
3690                    if (err4 != null) {
3691                      future.completeExceptionally(err4);
3692                    } else {
3693                      future.complete(result);
3694                    }
3695                  });
3696              }
3697            });
3698          } else {
3699            addListener(createTable(newTableDesc), (result, err5) -> {
3700              if (err5 != null) {
3701                future.completeExceptionally(err5);
3702              } else {
3703                future.complete(result);
3704              }
3705            });
3706          }
3707        });
3708      });
3709    });
3710    return future;
3711  }
3712
3713  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3714      List<RegionInfo> hris) {
3715    return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3716      .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3717        controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3718        (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3719        resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3720      .serverName(serverName).call();
3721  }
3722
3723  @Override
3724  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3725    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3726        .action((controller, stub) -> this
3727            .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3728              SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3729              (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3730              resp -> resp.getPreviousRpcThrottleEnabled()))
3731        .call();
3732    return future;
3733  }
3734
3735  @Override
3736  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3737    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3738        .action((controller, stub) -> this
3739            .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3740              stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3741              (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3742              resp -> resp.getRpcThrottleEnabled()))
3743        .call();
3744    return future;
3745  }
3746
3747  @Override
3748  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3749    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3750        .action((controller, stub) -> this
3751            .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3752              controller, stub,
3753              SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3754                  .build(),
3755              (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3756              resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3757        .call();
3758    return future;
3759  }
3760
3761  @Override
3762  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3763    return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3764      .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3765      Map<TableName, Long>> call(controller, stub,
3766        RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3767        (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3768        resp -> resp.getSizesList().stream().collect(Collectors
3769          .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3770      .call();
3771  }
3772
3773  @Override
3774  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3775      ServerName serverName) {
3776    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3777      .action((controller, stub) -> this
3778        .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3779        Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3780          RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3781          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3782          resp -> resp.getSnapshotsList().stream()
3783            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3784              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3785      .serverName(serverName).call();
3786  }
3787
3788  private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3789      Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3790    return this.<SpaceQuotaSnapshot> newMasterCaller()
3791      .action((controller, stub) -> this
3792        .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3793          RequestConverter.buildGetQuotaStatesRequest(),
3794          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3795      .call();
3796  }
3797
3798  @Override
3799  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3800    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3801      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3802      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3803  }
3804
3805  @Override
3806  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3807    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3808    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3809      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3810      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3811  }
3812
3813  @Override
3814  public CompletableFuture<Void> grant(UserPermission userPermission,
3815      boolean mergeExistingPermissions) {
3816    return this.<Void> newMasterCaller()
3817        .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3818          stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3819          (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3820        .call();
3821  }
3822
3823  @Override
3824  public CompletableFuture<Void> revoke(UserPermission userPermission) {
3825    return this.<Void> newMasterCaller()
3826        .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3827          stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3828          (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3829        .call();
3830  }
3831
3832  @Override
3833  public CompletableFuture<List<UserPermission>>
3834      getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3835    return this.<List<UserPermission>> newMasterCaller().action((controller,
3836        stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3837            List<UserPermission>> call(controller, stub,
3838              ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3839              (s, c, req, done) -> s.getUserPermissions(c, req, done),
3840              resp -> resp.getUserPermissionList().stream()
3841                .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3842                .collect(Collectors.toList())))
3843        .call();
3844  }
3845
3846  @Override
3847  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
3848      List<Permission> permissions) {
3849    return this.<List<Boolean>> newMasterCaller()
3850        .action((controller, stub) -> this
3851            .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
3852              stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
3853              (s, c, req, done) -> s.hasUserPermissions(c, req, done),
3854              resp -> resp.getHasUserPermissionList()))
3855        .call();
3856  }
3857}