001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.replication.ReplicationException;
086import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
087import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
088import org.apache.hadoop.hbase.replication.SyncReplicationState;
089import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
090import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
091import org.apache.hadoop.hbase.security.access.Permission;
092import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
093import org.apache.hadoop.hbase.security.access.UserPermission;
094import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
095import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
096import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
099import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
100import org.apache.hadoop.hbase.util.Pair;
101import org.apache.hadoop.hbase.util.Strings;
102import org.apache.yetus.audience.InterfaceAudience;
103import org.slf4j.Logger;
104import org.slf4j.LoggerFactory;
105
106import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
107import org.apache.hbase.thirdparty.com.google.protobuf.Message;
108import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
109import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
110import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
111import org.apache.hbase.thirdparty.io.netty.util.Timeout;
112import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
113import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
114
115import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
116import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
357
358/**
359 * The implementation of AsyncAdmin.
360 * <p>
361 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
362 * be finished inside the rpc framework thread, which means that the callbacks registered to the
363 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
364 * this class should not try to do time consuming tasks in the callbacks.
365 * @since 2.0.0
366 * @see AsyncHBaseAdmin
367 * @see AsyncConnection#getAdmin()
368 * @see AsyncConnection#getAdminBuilder()
369 */
370@InterfaceAudience.Private
371class RawAsyncHBaseAdmin implements AsyncAdmin {
372
373  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
374
375  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
376
377  private final AsyncConnectionImpl connection;
378
379  private final HashedWheelTimer retryTimer;
380
381  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
382
383  private final long rpcTimeoutNs;
384
385  private final long operationTimeoutNs;
386
387  private final long pauseNs;
388
389  private final long pauseNsForServerOverloaded;
390
391  private final int maxAttempts;
392
393  private final int startLogErrorsCnt;
394
395  private final NonceGenerator ng;
396
397  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
398    AsyncAdminBuilderBase builder) {
399    this.connection = connection;
400    this.retryTimer = retryTimer;
401    this.metaTable = connection.getTable(META_TABLE_NAME);
402    this.rpcTimeoutNs = builder.rpcTimeoutNs;
403    this.operationTimeoutNs = builder.operationTimeoutNs;
404    this.pauseNs = builder.pauseNs;
405    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
406      LOG.warn(
407        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
408          + " the normal pause value {} ms, use the greater one instead",
409        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
410        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
411      this.pauseNsForServerOverloaded = builder.pauseNs;
412    } else {
413      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
414    }
415    this.maxAttempts = builder.maxAttempts;
416    this.startLogErrorsCnt = builder.startLogErrorsCnt;
417    this.ng = connection.getNonceGenerator();
418  }
419
420  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
421    return this.connection.callerFactory.<T> masterRequest()
422      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
423      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
424      .pause(pauseNs, TimeUnit.NANOSECONDS)
425      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
426      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
427  }
428
429  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
430    return this.connection.callerFactory.<T> adminRequest()
431      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
432      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
433      .pause(pauseNs, TimeUnit.NANOSECONDS)
434      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
435      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
436  }
437
438  @FunctionalInterface
439  private interface MasterRpcCall<RESP, REQ> {
440    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
441      RpcCallback<RESP> done);
442  }
443
444  @FunctionalInterface
445  private interface AdminRpcCall<RESP, REQ> {
446    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
447      RpcCallback<RESP> done);
448  }
449
450  @FunctionalInterface
451  private interface Converter<D, S> {
452    D convert(S src) throws IOException;
453  }
454
455  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
456    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
457    Converter<RESP, PRESP> respConverter) {
458    CompletableFuture<RESP> future = new CompletableFuture<>();
459    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
460
461      @Override
462      public void run(PRESP resp) {
463        if (controller.failed()) {
464          future.completeExceptionally(controller.getFailed());
465        } else {
466          try {
467            future.complete(respConverter.convert(resp));
468          } catch (IOException e) {
469            future.completeExceptionally(e);
470          }
471        }
472      }
473    });
474    return future;
475  }
476
477  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
478    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
479    Converter<RESP, PRESP> respConverter) {
480    CompletableFuture<RESP> future = new CompletableFuture<>();
481    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
482
483      @Override
484      public void run(PRESP resp) {
485        if (controller.failed()) {
486          future.completeExceptionally(controller.getFailed());
487        } else {
488          try {
489            future.complete(respConverter.convert(resp));
490          } catch (IOException e) {
491            future.completeExceptionally(e);
492          }
493        }
494      }
495    });
496    return future;
497  }
498
499  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
500    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
501    ProcedureBiConsumer consumer) {
502    return procedureCall(b -> {
503    }, preq, rpcCall, respConverter, consumer);
504  }
505
506  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
507    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
508    ProcedureBiConsumer consumer) {
509    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
510  }
511
512  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
513    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
514    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
515    ProcedureBiConsumer consumer) {
516    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
517      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
518    prioritySetter.accept(builder);
519    CompletableFuture<Long> procFuture = builder.call();
520    CompletableFuture<Void> future = waitProcedureResult(procFuture);
521    addListener(future, consumer);
522    return future;
523  }
524
525  @Override
526  public CompletableFuture<Boolean> tableExists(TableName tableName) {
527    if (TableName.isMetaTableName(tableName)) {
528      return CompletableFuture.completedFuture(true);
529    }
530    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
531  }
532
533  @Override
534  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
535    return getTableDescriptors(
536      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
537  }
538
539  /**
540   * {@link #listTableDescriptors(boolean)}
541   */
542  @Override
543  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
544    boolean includeSysTables) {
545    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
546      + "use listTableDescriptors(boolean) instead");
547    return getTableDescriptors(
548      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
549  }
550
551  @Override
552  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
553    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
554      + "use listTableDescriptors(boolean) instead");
555    if (tableNames.isEmpty()) {
556      return CompletableFuture.completedFuture(Collections.emptyList());
557    }
558    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
559  }
560
561  private CompletableFuture<List<TableDescriptor>>
562    getTableDescriptors(GetTableDescriptorsRequest request) {
563    return this.<List<TableDescriptor>> newMasterCaller()
564      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
565        List<TableDescriptor>> call(controller, stub, request,
566          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
567          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
568      .call();
569  }
570
571  @Override
572  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
573    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
574  }
575
576  @Override
577  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
578    boolean includeSysTables) {
579    Preconditions.checkNotNull(pattern,
580      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
581    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
582  }
583
584  @Override
585  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
586    return this.<List<TableName>> newMasterCaller()
587      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
588        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
589          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
590          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
591          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
592      .call();
593  }
594
595  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
596    return this.<List<TableName>> newMasterCaller()
597      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
598        List<TableName>> call(controller, stub, request,
599          (s, c, req, done) -> s.getTableNames(c, req, done),
600          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
601      .call();
602  }
603
604  @Override
605  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
606    return this.<List<TableDescriptor>> newMasterCaller()
607      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
608        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
609          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
610          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
611          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
612      .call();
613  }
614
615  @Override
616  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
617    return this.<List<TableDescriptor>> newMasterCaller()
618      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
619        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
620          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
621          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
622          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
623      .call();
624  }
625
626  @Override
627  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
628    return this.<List<TableName>> newMasterCaller()
629      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
630        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
631          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
632          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
633          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
634      .call();
635  }
636
637  @Override
638  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
639    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
640    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
641      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
642        List<TableSchema>> call(controller, stub,
643          RequestConverter.buildGetTableDescriptorsRequest(tableName),
644          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
645          (resp) -> resp.getTableSchemaList()))
646      .call(), (tableSchemas, error) -> {
647        if (error != null) {
648          future.completeExceptionally(error);
649          return;
650        }
651        if (!tableSchemas.isEmpty()) {
652          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
653        } else {
654          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
655        }
656      });
657    return future;
658  }
659
660  @Override
661  public CompletableFuture<Void> createTable(TableDescriptor desc) {
662    return createTable(desc.getTableName(),
663      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
664  }
665
666  @Override
667  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
668    int numRegions) {
669    try {
670      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
671    } catch (IllegalArgumentException e) {
672      return failedFuture(e);
673    }
674  }
675
676  @Override
677  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
678    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
679      + " use createTable(TableDescriptor) instead");
680    try {
681      verifySplitKeys(splitKeys);
682      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
683        splitKeys, ng.getNonceGroup(), ng.newNonce()));
684    } catch (IllegalArgumentException e) {
685      return failedFuture(e);
686    }
687  }
688
689  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
690    Preconditions.checkNotNull(tableName, "table name is null");
691    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
692      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
693      new CreateTableProcedureBiConsumer(tableName));
694  }
695
696  @Override
697  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
698    return modifyTable(desc, true);
699  }
700
701  @Override
702  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
703    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
704      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
705        ng.newNonce(), reopenRegions),
706      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
707      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
708  }
709
710  @Override
711  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
712    return this.<ModifyTableStoreFileTrackerRequest,
713      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
714        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
715          ng.getNonceGroup(), ng.newNonce()),
716        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
717        (resp) -> resp.getProcId(),
718        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
719  }
720
721  @Override
722  public CompletableFuture<Void> deleteTable(TableName tableName) {
723    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
724      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
725      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
726      new DeleteTableProcedureBiConsumer(tableName));
727  }
728
729  @Override
730  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
731    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
732      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
733        ng.newNonce()),
734      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
735      new TruncateTableProcedureBiConsumer(tableName));
736  }
737
738  @Override
739  public CompletableFuture<Void> enableTable(TableName tableName) {
740    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
741      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
742      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
743      new EnableTableProcedureBiConsumer(tableName));
744  }
745
746  @Override
747  public CompletableFuture<Void> disableTable(TableName tableName) {
748    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
749      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
750      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
751      new DisableTableProcedureBiConsumer(tableName));
752  }
753
754  /**
755   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
756   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
757   * targetState).
758   */
759  private static CompletableFuture<Boolean> completeCheckTableState(
760    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
761    TableState.State targetState, TableName tableName) {
762    if (error != null) {
763      future.completeExceptionally(error);
764    } else {
765      if (tableState != null) {
766        future.complete(tableState.inStates(targetState));
767      } else {
768        future.completeExceptionally(new TableNotFoundException(tableName));
769      }
770    }
771    return future;
772  }
773
774  @Override
775  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
776    if (TableName.isMetaTableName(tableName)) {
777      return CompletableFuture.completedFuture(true);
778    }
779    CompletableFuture<Boolean> future = new CompletableFuture<>();
780    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
781      (tableState, error) -> {
782        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
783          TableState.State.ENABLED, tableName);
784      });
785    return future;
786  }
787
788  @Override
789  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
790    if (TableName.isMetaTableName(tableName)) {
791      return CompletableFuture.completedFuture(false);
792    }
793    CompletableFuture<Boolean> future = new CompletableFuture<>();
794    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
795      (tableState, error) -> {
796        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
797          TableState.State.DISABLED, tableName);
798      });
799    return future;
800  }
801
802  @Override
803  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
804    if (TableName.isMetaTableName(tableName)) {
805      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
806        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
807    }
808    CompletableFuture<Boolean> future = new CompletableFuture<>();
809    addListener(isTableEnabled(tableName), (enabled, error) -> {
810      if (error != null) {
811        if (error instanceof TableNotFoundException) {
812          future.complete(false);
813        } else {
814          future.completeExceptionally(error);
815        }
816        return;
817      }
818      if (!enabled) {
819        future.complete(false);
820      } else {
821        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
822          (locations, error1) -> {
823            if (error1 != null) {
824              future.completeExceptionally(error1);
825              return;
826            }
827            List<HRegionLocation> notDeployedRegions = locations.stream()
828              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
829            if (notDeployedRegions.size() > 0) {
830              if (LOG.isDebugEnabled()) {
831                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
832              }
833              future.complete(false);
834              return;
835            }
836            future.complete(true);
837          });
838      }
839    });
840    return future;
841  }
842
843  @Override
844  public CompletableFuture<Void> addColumnFamily(TableName tableName,
845    ColumnFamilyDescriptor columnFamily) {
846    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
847      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
848        ng.newNonce()),
849      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
850      new AddColumnFamilyProcedureBiConsumer(tableName));
851  }
852
853  @Override
854  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
855    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
856      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
857        ng.newNonce()),
858      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
859      new DeleteColumnFamilyProcedureBiConsumer(tableName));
860  }
861
862  @Override
863  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
864    ColumnFamilyDescriptor columnFamily) {
865    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
866      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
867        ng.newNonce()),
868      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
869      new ModifyColumnFamilyProcedureBiConsumer(tableName));
870  }
871
872  @Override
873  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
874    byte[] family, String dstSFT) {
875    return this.<ModifyColumnStoreFileTrackerRequest,
876      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
877        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
878          ng.getNonceGroup(), ng.newNonce()),
879        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
880        (resp) -> resp.getProcId(),
881        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
882  }
883
884  @Override
885  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
886    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
887      RequestConverter.buildCreateNamespaceRequest(descriptor),
888      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
889      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
890  }
891
892  @Override
893  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
894    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
895      RequestConverter.buildModifyNamespaceRequest(descriptor),
896      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
897      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
898  }
899
900  @Override
901  public CompletableFuture<Void> deleteNamespace(String name) {
902    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
903      RequestConverter.buildDeleteNamespaceRequest(name),
904      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
905      new DeleteNamespaceProcedureBiConsumer(name));
906  }
907
908  @Override
909  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
910    return this.<NamespaceDescriptor> newMasterCaller()
911      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
912        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
913          RequestConverter.buildGetNamespaceDescriptorRequest(name),
914          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
915          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
916      .call();
917  }
918
919  @Override
920  public CompletableFuture<List<String>> listNamespaces() {
921    return this.<List<String>> newMasterCaller()
922      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
923        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
924          (s, c, req, done) -> s.listNamespaces(c, req, done),
925          (resp) -> resp.getNamespaceNameList()))
926      .call();
927  }
928
929  @Override
930  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
931    return this.<List<NamespaceDescriptor>> newMasterCaller()
932      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
933        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
934          ListNamespaceDescriptorsRequest.newBuilder().build(),
935          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
936          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
937      .call();
938  }
939
940  @Override
941  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
942    return this.<List<RegionInfo>> newAdminCaller()
943      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
944        List<RegionInfo>> adminCall(controller, stub,
945          RequestConverter.buildGetOnlineRegionRequest(),
946          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
947          resp -> ProtobufUtil.getRegionInfos(resp)))
948      .serverName(serverName).call();
949  }
950
951  @Override
952  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
953    if (tableName.equals(META_TABLE_NAME)) {
954      return connection.registry.getMetaRegionLocations()
955        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
956          .collect(Collectors.toList()));
957    } else {
958      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
959        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
960    }
961  }
962
963  @Override
964  public CompletableFuture<Void> flush(TableName tableName) {
965    return flush(tableName, Collections.emptyList());
966  }
967
968  @Override
969  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
970    return flush(tableName, Collections.singletonList(columnFamily));
971  }
972
973  @Override
974  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
975    // This is for keeping compatibility with old implementation.
976    // If the server version is lower than the client version, it's possible that the
977    // flushTable method is not present in the server side, if so, we need to fall back
978    // to the old implementation.
979    List<byte[]> columnFamilies = columnFamilyList.stream()
980      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
981    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
982      ng.getNonceGroup(), ng.newNonce());
983    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
984      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
985      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
986    CompletableFuture<Void> future = new CompletableFuture<>();
987    addListener(procFuture, (ret, error) -> {
988      if (error != null) {
989        if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
990          future.completeExceptionally(error);
991        } else if (error instanceof DoNotRetryIOException) {
992          // usually this is caused by the method is not present on the server or
993          // the hbase hadoop version does not match the running hadoop version.
994          // if that happens, we need fall back to the old flush implementation.
995          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
996          legacyFlush(future, tableName, columnFamilies);
997        } else {
998          future.completeExceptionally(error);
999        }
1000      } else {
1001        future.complete(ret);
1002      }
1003    });
1004    return future;
1005  }
1006
1007  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1008    List<byte[]> columnFamilies) {
1009    addListener(tableExists(tableName), (exists, err) -> {
1010      if (err != null) {
1011        future.completeExceptionally(err);
1012      } else if (!exists) {
1013        future.completeExceptionally(new TableNotFoundException(tableName));
1014      } else {
1015        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1016          if (err2 != null) {
1017            future.completeExceptionally(err2);
1018          } else if (!tableEnabled) {
1019            future.completeExceptionally(new TableNotEnabledException(tableName));
1020          } else {
1021            Map<String, String> props = new HashMap<>();
1022            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1023              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1024                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1025            }
1026            addListener(
1027              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1028              (ret, err3) -> {
1029                if (err3 != null) {
1030                  future.completeExceptionally(err3);
1031                } else {
1032                  future.complete(ret);
1033                }
1034              });
1035          }
1036        });
1037      }
1038    });
1039  }
1040
1041  @Override
1042  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1043    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1044    });
1045  }
1046
1047  @Override
1048  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1049    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1050      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1051    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1052    });
1053  }
1054
1055  /**
1056   * This method is for internal use only, where we need the response of the flush.
1057   * <p/>
1058   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1059   * API.
1060   */
1061  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1062    boolean writeFlushWALMarker) {
1063    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1064    addListener(getRegionLocation(regionName), (location, err) -> {
1065      if (err != null) {
1066        future.completeExceptionally(err);
1067        return;
1068      }
1069      ServerName serverName = location.getServerName();
1070      if (serverName == null) {
1071        future
1072          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1073        return;
1074      }
1075      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1076        (ret, err2) -> {
1077          if (err2 != null) {
1078            future.completeExceptionally(err2);
1079          } else {
1080            future.complete(ret);
1081          }
1082        });
1083    });
1084    return future;
1085  }
1086
1087  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1088    byte[] columnFamily, boolean writeFlushWALMarker) {
1089    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1090      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1091        FlushRegionResponse> adminCall(controller, stub,
1092          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1093            writeFlushWALMarker),
1094          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1095      .call();
1096  }
1097
1098  @Override
1099  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1100    CompletableFuture<Void> future = new CompletableFuture<>();
1101    addListener(getRegions(sn), (hRegionInfos, err) -> {
1102      if (err != null) {
1103        future.completeExceptionally(err);
1104        return;
1105      }
1106      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1107      if (hRegionInfos != null) {
1108        hRegionInfos
1109          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1110          })));
1111      }
1112      addListener(CompletableFuture.allOf(
1113        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1114          if (err2 != null) {
1115            future.completeExceptionally(err2);
1116          } else {
1117            future.complete(ret);
1118          }
1119        });
1120    });
1121    return future;
1122  }
1123
1124  @Override
1125  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1126    return compact(tableName, null, false, compactType);
1127  }
1128
1129  @Override
1130  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1131    CompactType compactType) {
1132    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1133      + "If you don't specify a columnFamily, use compact(TableName) instead");
1134    return compact(tableName, columnFamily, false, compactType);
1135  }
1136
1137  @Override
1138  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1139    return compactRegion(regionName, null, false);
1140  }
1141
1142  @Override
1143  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1144    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1145      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1146    return compactRegion(regionName, columnFamily, false);
1147  }
1148
1149  @Override
1150  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1151    return compact(tableName, null, true, compactType);
1152  }
1153
1154  @Override
1155  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1156    CompactType compactType) {
1157    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1158      + "If you don't specify a columnFamily, use compact(TableName) instead");
1159    return compact(tableName, columnFamily, true, compactType);
1160  }
1161
1162  @Override
1163  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1164    return compactRegion(regionName, null, true);
1165  }
1166
1167  @Override
1168  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1169    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1170      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1171    return compactRegion(regionName, columnFamily, true);
1172  }
1173
1174  @Override
1175  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1176    return compactRegionServer(sn, false);
1177  }
1178
1179  @Override
1180  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1181    return compactRegionServer(sn, true);
1182  }
1183
1184  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1185    CompletableFuture<Void> future = new CompletableFuture<>();
1186    addListener(getRegions(sn), (hRegionInfos, err) -> {
1187      if (err != null) {
1188        future.completeExceptionally(err);
1189        return;
1190      }
1191      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1192      if (hRegionInfos != null) {
1193        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1194      }
1195      addListener(CompletableFuture.allOf(
1196        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1197          if (err2 != null) {
1198            future.completeExceptionally(err2);
1199          } else {
1200            future.complete(ret);
1201          }
1202        });
1203    });
1204    return future;
1205  }
1206
1207  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1208    boolean major) {
1209    CompletableFuture<Void> future = new CompletableFuture<>();
1210    addListener(getRegionLocation(regionName), (location, err) -> {
1211      if (err != null) {
1212        future.completeExceptionally(err);
1213        return;
1214      }
1215      ServerName serverName = location.getServerName();
1216      if (serverName == null) {
1217        future
1218          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1219        return;
1220      }
1221      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1222        (ret, err2) -> {
1223          if (err2 != null) {
1224            future.completeExceptionally(err2);
1225          } else {
1226            future.complete(ret);
1227          }
1228        });
1229    });
1230    return future;
1231  }
1232
1233  /**
1234   * List all region locations for the specific table.
1235   */
1236  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1237    if (TableName.META_TABLE_NAME.equals(tableName)) {
1238      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1239      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1240        if (err != null) {
1241          future.completeExceptionally(err);
1242        } else if (
1243          metaRegions == null || metaRegions.isEmpty()
1244            || metaRegions.getDefaultRegionLocation() == null
1245        ) {
1246          future.completeExceptionally(new IOException("meta region does not found"));
1247        } else {
1248          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1249        }
1250      });
1251      return future;
1252    } else {
1253      // For non-meta table, we fetch all locations by scanning hbase:meta table
1254      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1255    }
1256  }
1257
1258  /**
1259   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1260   */
1261  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1262    CompactType compactType) {
1263    CompletableFuture<Void> future = new CompletableFuture<>();
1264
1265    switch (compactType) {
1266      case MOB:
1267        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1268          if (err != null) {
1269            future.completeExceptionally(err);
1270            return;
1271          }
1272          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1273          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1274            if (err2 != null) {
1275              future.completeExceptionally(err2);
1276            } else {
1277              future.complete(ret);
1278            }
1279          });
1280        });
1281        break;
1282      case NORMAL:
1283        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1284          if (err != null) {
1285            future.completeExceptionally(err);
1286            return;
1287          }
1288          if (locations == null || locations.isEmpty()) {
1289            future.completeExceptionally(new TableNotFoundException(tableName));
1290          }
1291          CompletableFuture<?>[] compactFutures =
1292            locations.stream().filter(l -> l.getRegion() != null)
1293              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1294              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1295              .toArray(CompletableFuture<?>[]::new);
1296          // future complete unless all of the compact futures are completed.
1297          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1298            if (err2 != null) {
1299              future.completeExceptionally(err2);
1300            } else {
1301              future.complete(ret);
1302            }
1303          });
1304        });
1305        break;
1306      default:
1307        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1308    }
1309    return future;
1310  }
1311
1312  /**
1313   * Compact the region at specific region server.
1314   */
1315  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1316    final boolean major, byte[] columnFamily) {
1317    return this.<Void> newAdminCaller().serverName(sn)
1318      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1319        Void> adminCall(controller, stub,
1320          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1321          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1322      .call();
1323  }
1324
1325  private byte[] toEncodeRegionName(byte[] regionName) {
1326    return RegionInfo.isEncodedRegionName(regionName)
1327      ? regionName
1328      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1329  }
1330
1331  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1332    CompletableFuture<TableName> result) {
1333    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1334      if (err != null) {
1335        result.completeExceptionally(err);
1336        return;
1337      }
1338      RegionInfo regionInfo = location.getRegion();
1339      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1340        result.completeExceptionally(
1341          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1342        return;
1343      }
1344      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1345        if (!tableName.get().equals(regionInfo.getTable())) {
1346          // tables of this two region should be same.
1347          result.completeExceptionally(
1348            new IllegalArgumentException("Cannot merge regions from two different tables "
1349              + tableName.get() + " and " + regionInfo.getTable()));
1350        } else {
1351          result.complete(tableName.get());
1352        }
1353      }
1354    });
1355  }
1356
1357  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1358    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1359    CompletableFuture<TableName> future = new CompletableFuture<>();
1360    for (byte[] encodedRegionName : encodedRegionNames) {
1361      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1362    }
1363    return future;
1364  }
1365
1366  @Override
1367  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1368    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1369  }
1370
1371  @Override
1372  public CompletableFuture<Boolean> isMergeEnabled() {
1373    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1374  }
1375
1376  @Override
1377  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1378    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1379  }
1380
1381  @Override
1382  public CompletableFuture<Boolean> isSplitEnabled() {
1383    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1384  }
1385
1386  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1387    MasterSwitchType switchType) {
1388    SetSplitOrMergeEnabledRequest request =
1389      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1390    return this.<Boolean> newMasterCaller()
1391      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1392        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1393          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1394          (resp) -> resp.getPrevValueList().get(0)))
1395      .call();
1396  }
1397
1398  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1399    IsSplitOrMergeEnabledRequest request =
1400      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1401    return this.<Boolean> newMasterCaller()
1402      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1403        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1404          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1405      .call();
1406  }
1407
1408  @Override
1409  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1410    if (nameOfRegionsToMerge.size() < 2) {
1411      return failedFuture(new IllegalArgumentException(
1412        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1413    }
1414    CompletableFuture<Void> future = new CompletableFuture<>();
1415    byte[][] encodedNameOfRegionsToMerge =
1416      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1417
1418    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1419      if (err != null) {
1420        future.completeExceptionally(err);
1421        return;
1422      }
1423
1424      final MergeTableRegionsRequest request;
1425      try {
1426        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1427          forcible, ng.getNonceGroup(), ng.newNonce());
1428      } catch (DeserializationException e) {
1429        future.completeExceptionally(e);
1430        return;
1431      }
1432
1433      addListener(
1434        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1435          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1436        (ret, err2) -> {
1437          if (err2 != null) {
1438            future.completeExceptionally(err2);
1439          } else {
1440            future.complete(ret);
1441          }
1442        });
1443    });
1444    return future;
1445  }
1446
1447  @Override
1448  public CompletableFuture<Void> split(TableName tableName) {
1449    CompletableFuture<Void> future = new CompletableFuture<>();
1450    addListener(tableExists(tableName), (exist, error) -> {
1451      if (error != null) {
1452        future.completeExceptionally(error);
1453        return;
1454      }
1455      if (!exist) {
1456        future.completeExceptionally(new TableNotFoundException(tableName));
1457        return;
1458      }
1459      addListener(metaTable
1460        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1461          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1462            ClientMetaTableAccessor.QueryType.REGION))
1463          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1464            ClientMetaTableAccessor.QueryType.REGION))),
1465        (results, err2) -> {
1466          if (err2 != null) {
1467            future.completeExceptionally(err2);
1468            return;
1469          }
1470          if (results != null && !results.isEmpty()) {
1471            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1472            for (Result r : results) {
1473              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1474                continue;
1475              }
1476              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1477              if (rl != null) {
1478                for (HRegionLocation h : rl.getRegionLocations()) {
1479                  if (h != null && h.getServerName() != null) {
1480                    RegionInfo hri = h.getRegion();
1481                    if (
1482                      hri == null || hri.isSplitParent()
1483                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1484                    ) {
1485                      continue;
1486                    }
1487                    splitFutures.add(split(hri, null));
1488                  }
1489                }
1490              }
1491            }
1492            addListener(
1493              CompletableFuture
1494                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1495              (ret, exception) -> {
1496                if (exception != null) {
1497                  future.completeExceptionally(exception);
1498                  return;
1499                }
1500                future.complete(ret);
1501              });
1502          } else {
1503            future.complete(null);
1504          }
1505        });
1506    });
1507    return future;
1508  }
1509
1510  @Override
1511  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1512    CompletableFuture<Void> result = new CompletableFuture<>();
1513    if (splitPoint == null) {
1514      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1515    }
1516    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1517      (loc, err) -> {
1518        if (err != null) {
1519          result.completeExceptionally(err);
1520        } else if (loc == null || loc.getRegion() == null) {
1521          result.completeExceptionally(new IllegalArgumentException(
1522            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1523        } else {
1524          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1525            if (err2 != null) {
1526              result.completeExceptionally(err2);
1527            } else {
1528              result.complete(ret);
1529            }
1530
1531          });
1532        }
1533      });
1534    return result;
1535  }
1536
1537  @Override
1538  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1539    CompletableFuture<Void> future = new CompletableFuture<>();
1540    addListener(getRegionLocation(regionName), (location, err) -> {
1541      if (err != null) {
1542        future.completeExceptionally(err);
1543        return;
1544      }
1545      RegionInfo regionInfo = location.getRegion();
1546      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1547        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1548          + "Replicas are auto-split when their primary is split."));
1549        return;
1550      }
1551      ServerName serverName = location.getServerName();
1552      if (serverName == null) {
1553        future
1554          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1555        return;
1556      }
1557      addListener(split(regionInfo, null), (ret, err2) -> {
1558        if (err2 != null) {
1559          future.completeExceptionally(err2);
1560        } else {
1561          future.complete(ret);
1562        }
1563      });
1564    });
1565    return future;
1566  }
1567
1568  @Override
1569  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1570    Preconditions.checkNotNull(splitPoint,
1571      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1572    CompletableFuture<Void> future = new CompletableFuture<>();
1573    addListener(getRegionLocation(regionName), (location, err) -> {
1574      if (err != null) {
1575        future.completeExceptionally(err);
1576        return;
1577      }
1578      RegionInfo regionInfo = location.getRegion();
1579      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1580        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1581          + "Replicas are auto-split when their primary is split."));
1582        return;
1583      }
1584      ServerName serverName = location.getServerName();
1585      if (serverName == null) {
1586        future
1587          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1588        return;
1589      }
1590      if (
1591        regionInfo.getStartKey() != null
1592          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1593      ) {
1594        future.completeExceptionally(
1595          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1596        return;
1597      }
1598      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1599        if (err2 != null) {
1600          future.completeExceptionally(err2);
1601        } else {
1602          future.complete(ret);
1603        }
1604      });
1605    });
1606    return future;
1607  }
1608
1609  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1610    CompletableFuture<Void> future = new CompletableFuture<>();
1611    TableName tableName = hri.getTable();
1612    final SplitTableRegionRequest request;
1613    try {
1614      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1615        ng.newNonce());
1616    } catch (DeserializationException e) {
1617      future.completeExceptionally(e);
1618      return future;
1619    }
1620
1621    addListener(
1622      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1623        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1624      (ret, err2) -> {
1625        if (err2 != null) {
1626          future.completeExceptionally(err2);
1627        } else {
1628          future.complete(ret);
1629        }
1630      });
1631    return future;
1632  }
1633
1634  @Override
1635  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1636    CompletableFuture<Void> future = new CompletableFuture<>();
1637    addListener(getRegionLocation(regionName), (location, err) -> {
1638      if (err != null) {
1639        future.completeExceptionally(err);
1640        return;
1641      }
1642      RegionInfo regionInfo = location.getRegion();
1643      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1644        future.completeExceptionally(new IllegalArgumentException(
1645          "Can't truncate replicas directly.Replicas are auto-truncated "
1646            + "when their primary is truncated."));
1647        return;
1648      }
1649      ServerName serverName = location.getServerName();
1650      if (serverName == null) {
1651        future
1652          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1653        return;
1654      }
1655      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1656        if (err2 != null) {
1657          future.completeExceptionally(err2);
1658        } else {
1659          future.complete(ret);
1660        }
1661      });
1662    });
1663    return future;
1664  }
1665
1666  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1667    CompletableFuture<Void> future = new CompletableFuture<>();
1668    TableName tableName = hri.getTable();
1669    final MasterProtos.TruncateRegionRequest request;
1670    try {
1671      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1672    } catch (DeserializationException e) {
1673      future.completeExceptionally(e);
1674      return future;
1675    }
1676    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1677      MasterProtos.TruncateRegionResponse::getProcId,
1678      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1679        if (err2 != null) {
1680          future.completeExceptionally(err2);
1681        } else {
1682          future.complete(ret);
1683        }
1684      });
1685    return future;
1686  }
1687
1688  @Override
1689  public CompletableFuture<Void> assign(byte[] regionName) {
1690    CompletableFuture<Void> future = new CompletableFuture<>();
1691    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1692      if (err != null) {
1693        future.completeExceptionally(err);
1694        return;
1695      }
1696      addListener(
1697        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1698          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1699            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1700            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1701          .call(),
1702        (ret, err2) -> {
1703          if (err2 != null) {
1704            future.completeExceptionally(err2);
1705          } else {
1706            future.complete(ret);
1707          }
1708        });
1709    });
1710    return future;
1711  }
1712
1713  @Override
1714  public CompletableFuture<Void> unassign(byte[] regionName) {
1715    CompletableFuture<Void> future = new CompletableFuture<>();
1716    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1717      if (err != null) {
1718        future.completeExceptionally(err);
1719        return;
1720      }
1721      addListener(
1722        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1723          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1724            Void> call(controller, stub,
1725              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1726              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1727          .call(),
1728        (ret, err2) -> {
1729          if (err2 != null) {
1730            future.completeExceptionally(err2);
1731          } else {
1732            future.complete(ret);
1733          }
1734        });
1735    });
1736    return future;
1737  }
1738
1739  @Override
1740  public CompletableFuture<Void> offline(byte[] regionName) {
1741    CompletableFuture<Void> future = new CompletableFuture<>();
1742    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1743      if (err != null) {
1744        future.completeExceptionally(err);
1745        return;
1746      }
1747      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1748        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1749          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1750          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1751        .call(), (ret, err2) -> {
1752          if (err2 != null) {
1753            future.completeExceptionally(err2);
1754          } else {
1755            future.complete(ret);
1756          }
1757        });
1758    });
1759    return future;
1760  }
1761
1762  @Override
1763  public CompletableFuture<Void> move(byte[] regionName) {
1764    CompletableFuture<Void> future = new CompletableFuture<>();
1765    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1766      if (err != null) {
1767        future.completeExceptionally(err);
1768        return;
1769      }
1770      addListener(
1771        moveRegion(regionInfo,
1772          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1773        (ret, err2) -> {
1774          if (err2 != null) {
1775            future.completeExceptionally(err2);
1776          } else {
1777            future.complete(ret);
1778          }
1779        });
1780    });
1781    return future;
1782  }
1783
1784  @Override
1785  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1786    Preconditions.checkNotNull(destServerName,
1787      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1788    CompletableFuture<Void> future = new CompletableFuture<>();
1789    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1790      if (err != null) {
1791        future.completeExceptionally(err);
1792        return;
1793      }
1794      addListener(
1795        moveRegion(regionInfo, RequestConverter
1796          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1797        (ret, err2) -> {
1798          if (err2 != null) {
1799            future.completeExceptionally(err2);
1800          } else {
1801            future.complete(ret);
1802          }
1803        });
1804    });
1805    return future;
1806  }
1807
1808  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1809    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1810      .action(
1811        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1812          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1813      .call();
1814  }
1815
1816  @Override
1817  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1818    return this.<Void> newMasterCaller()
1819      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1820        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1821        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1822      .call();
1823  }
1824
1825  @Override
1826  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1827    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1828    Scan scan = QuotaTableUtil.makeScan(filter);
1829    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1830      new AdvancedScanResultConsumer() {
1831        List<QuotaSettings> settings = new ArrayList<>();
1832
1833        @Override
1834        public void onNext(Result[] results, ScanController controller) {
1835          for (Result result : results) {
1836            try {
1837              QuotaTableUtil.parseResultToCollection(result, settings);
1838            } catch (IOException e) {
1839              controller.terminate();
1840              future.completeExceptionally(e);
1841            }
1842          }
1843        }
1844
1845        @Override
1846        public void onError(Throwable error) {
1847          future.completeExceptionally(error);
1848        }
1849
1850        @Override
1851        public void onComplete() {
1852          future.complete(settings);
1853        }
1854      });
1855    return future;
1856  }
1857
1858  @Override
1859  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1860    boolean enabled) {
1861    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1862      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1863      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1864      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1865  }
1866
1867  @Override
1868  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1869    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1870      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1871      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1872      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1873  }
1874
1875  @Override
1876  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1877    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1878      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1879      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1880      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1881  }
1882
1883  @Override
1884  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1885    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1886      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1887      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1888      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1889  }
1890
1891  @Override
1892  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1893    return this.<ReplicationPeerConfig> newMasterCaller()
1894      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1895        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1896          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1897          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1898          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1899      .call();
1900  }
1901
1902  @Override
1903  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1904    ReplicationPeerConfig peerConfig) {
1905    return this.<UpdateReplicationPeerConfigRequest,
1906      UpdateReplicationPeerConfigResponse> procedureCall(
1907        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1908        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1909        (resp) -> resp.getProcId(),
1910        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1911  }
1912
1913  @Override
1914  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1915    SyncReplicationState clusterState) {
1916    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1917      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1918        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1919          clusterState),
1920        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1921        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1922          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1923  }
1924
1925  @Override
1926  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1927    Map<TableName, List<String>> tableCfs) {
1928    if (tableCfs == null) {
1929      return failedFuture(new ReplicationException("tableCfs is null"));
1930    }
1931
1932    CompletableFuture<Void> future = new CompletableFuture<Void>();
1933    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1934      if (!completeExceptionally(future, error)) {
1935        ReplicationPeerConfig newPeerConfig =
1936          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1937        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1938          if (!completeExceptionally(future, error)) {
1939            future.complete(result);
1940          }
1941        });
1942      }
1943    });
1944    return future;
1945  }
1946
1947  @Override
1948  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1949    Map<TableName, List<String>> tableCfs) {
1950    if (tableCfs == null) {
1951      return failedFuture(new ReplicationException("tableCfs is null"));
1952    }
1953
1954    CompletableFuture<Void> future = new CompletableFuture<Void>();
1955    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1956      if (!completeExceptionally(future, error)) {
1957        ReplicationPeerConfig newPeerConfig = null;
1958        try {
1959          newPeerConfig = ReplicationPeerConfigUtil
1960            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1961        } catch (ReplicationException e) {
1962          future.completeExceptionally(e);
1963          return;
1964        }
1965        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1966          if (!completeExceptionally(future, error)) {
1967            future.complete(result);
1968          }
1969        });
1970      }
1971    });
1972    return future;
1973  }
1974
1975  @Override
1976  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1977    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1978  }
1979
1980  @Override
1981  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1982    Preconditions.checkNotNull(pattern,
1983      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1984    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1985  }
1986
1987  private CompletableFuture<List<ReplicationPeerDescription>>
1988    listReplicationPeers(ListReplicationPeersRequest request) {
1989    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1990      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1991        List<ReplicationPeerDescription>> call(controller, stub, request,
1992          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1993          (resp) -> resp.getPeerDescList().stream()
1994            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1995            .collect(Collectors.toList())))
1996      .call();
1997  }
1998
1999  @Override
2000  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2001    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2002    addListener(listTableDescriptors(), (tables, error) -> {
2003      if (!completeExceptionally(future, error)) {
2004        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2005        tables.forEach(table -> {
2006          Map<String, Integer> cfs = new HashMap<>();
2007          Stream.of(table.getColumnFamilies())
2008            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2009            .forEach(column -> {
2010              cfs.put(column.getNameAsString(), column.getScope());
2011            });
2012          if (!cfs.isEmpty()) {
2013            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2014          }
2015        });
2016        future.complete(replicatedTableCFs);
2017      }
2018    });
2019    return future;
2020  }
2021
2022  @Override
2023  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2024    SnapshotProtos.SnapshotDescription snapshot =
2025      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2026    try {
2027      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2028    } catch (IllegalArgumentException e) {
2029      return failedFuture(e);
2030    }
2031    CompletableFuture<Void> future = new CompletableFuture<>();
2032    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2033      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2034    addListener(this.<SnapshotResponse> newMasterCaller()
2035      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2036        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2037      .call(), (resp, err) -> {
2038        if (err != null) {
2039          future.completeExceptionally(err);
2040          return;
2041        }
2042        waitSnapshotFinish(snapshotDesc, future, resp);
2043      });
2044    return future;
2045  }
2046
2047  // This is for keeping compatibility with old implementation.
2048  // If there is a procId field in the response, then the snapshot will be operated with a
2049  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2050  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2051    SnapshotResponse resp) {
2052    if (resp.hasProcId()) {
2053      getProcedureResult(resp.getProcId(), future, 0);
2054      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2055    } else {
2056      long expectedTimeout = resp.getExpectedTimeout();
2057      TimerTask pollingTask = new TimerTask() {
2058        int tries = 0;
2059        long startTime = EnvironmentEdgeManager.currentTime();
2060        long endTime = startTime + expectedTimeout;
2061        long maxPauseTime = expectedTimeout / maxAttempts;
2062
2063        @Override
2064        public void run(Timeout timeout) throws Exception {
2065          if (EnvironmentEdgeManager.currentTime() < endTime) {
2066            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2067              if (err2 != null) {
2068                future.completeExceptionally(err2);
2069              } else if (done) {
2070                future.complete(null);
2071              } else {
2072                // retry again after pauseTime.
2073                long pauseTime =
2074                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2075                pauseTime = Math.min(pauseTime, maxPauseTime);
2076                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2077              }
2078            });
2079          } else {
2080            future
2081              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2082                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2083          }
2084        }
2085      };
2086      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2087    }
2088  }
2089
2090  @Override
2091  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2092    return this.<Boolean> newMasterCaller()
2093      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2094        Boolean> call(controller, stub,
2095          IsSnapshotDoneRequest.newBuilder()
2096            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2097          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2098      .call();
2099  }
2100
2101  @Override
2102  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2103    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2104      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2105      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2106    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2107  }
2108
2109  @Override
2110  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2111    boolean restoreAcl) {
2112    CompletableFuture<Void> future = new CompletableFuture<>();
2113    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2114      if (err != null) {
2115        future.completeExceptionally(err);
2116        return;
2117      }
2118      TableName tableName = null;
2119      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2120        for (SnapshotDescription snap : snapshotDescriptions) {
2121          if (snap.getName().equals(snapshotName)) {
2122            tableName = snap.getTableName();
2123            break;
2124          }
2125        }
2126      }
2127      if (tableName == null) {
2128        future.completeExceptionally(
2129          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2130        return;
2131      }
2132      final TableName finalTableName = tableName;
2133      addListener(tableExists(finalTableName), (exists, err2) -> {
2134        if (err2 != null) {
2135          future.completeExceptionally(err2);
2136        } else if (!exists) {
2137          // if table does not exist, then just clone snapshot into new table.
2138          completeConditionalOnFuture(future,
2139            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2140        } else {
2141          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2142            if (err4 != null) {
2143              future.completeExceptionally(err4);
2144            } else if (!disabled) {
2145              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2146            } else {
2147              completeConditionalOnFuture(future,
2148                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2149            }
2150          });
2151        }
2152      });
2153    });
2154    return future;
2155  }
2156
2157  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2158    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2159    if (takeFailSafeSnapshot) {
2160      CompletableFuture<Void> future = new CompletableFuture<>();
2161      // Step.1 Take a snapshot of the current state
2162      String failSafeSnapshotSnapshotNameFormat =
2163        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2164          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2165      final String failSafeSnapshotSnapshotName =
2166        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2167          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2168          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2169      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2170      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2171        if (err != null) {
2172          future.completeExceptionally(err);
2173        } else {
2174          // Step.2 Restore snapshot
2175          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2176            (void2, err2) -> {
2177              if (err2 != null) {
2178                // Step.3.a Something went wrong during the restore and try to rollback.
2179                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2180                  restoreAcl, null), (void3, err3) -> {
2181                    if (err3 != null) {
2182                      future.completeExceptionally(err3);
2183                    } else {
2184                      // If fail to restore snapshot but rollback successfully, delete the
2185                      // restore-failsafe snapshot.
2186                      LOG.info(
2187                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2188                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2189                        if (err4 != null) {
2190                          LOG.error("Unable to remove the failsafe snapshot: {}",
2191                            failSafeSnapshotSnapshotName, err4);
2192                        }
2193                        String msg =
2194                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2195                            + failSafeSnapshotSnapshotName + " succeeded.";
2196                        LOG.error(msg);
2197                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2198                      });
2199                    }
2200                  });
2201              } else {
2202                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2203                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2204                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2205                  if (err3 != null) {
2206                    LOG.error(
2207                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2208                      err3);
2209                  }
2210                  future.complete(ret3);
2211                });
2212              }
2213            });
2214        }
2215      });
2216      return future;
2217    } else {
2218      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2219    }
2220  }
2221
2222  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2223    CompletableFuture<T> parentFuture) {
2224    addListener(parentFuture, (res, err) -> {
2225      if (err != null) {
2226        dependentFuture.completeExceptionally(err);
2227      } else {
2228        dependentFuture.complete(res);
2229      }
2230    });
2231  }
2232
2233  @Override
2234  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2235    boolean restoreAcl, String customSFT) {
2236    CompletableFuture<Void> future = new CompletableFuture<>();
2237    addListener(tableExists(tableName), (exists, err) -> {
2238      if (err != null) {
2239        future.completeExceptionally(err);
2240      } else if (exists) {
2241        future.completeExceptionally(new TableExistsException(tableName));
2242      } else {
2243        completeConditionalOnFuture(future,
2244          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2245      }
2246    });
2247    return future;
2248  }
2249
2250  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2251    boolean restoreAcl, String customSFT) {
2252    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2253      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2254    try {
2255      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2256    } catch (IllegalArgumentException e) {
2257      return failedFuture(e);
2258    }
2259    RestoreSnapshotRequest.Builder builder =
2260      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2261        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2262    if (customSFT != null) {
2263      builder.setCustomSFT(customSFT);
2264    }
2265    return waitProcedureResult(this.<Long> newMasterCaller()
2266      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2267        Long> call(controller, stub, builder.build(),
2268          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2269      .call());
2270  }
2271
2272  @Override
2273  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2274    return getCompletedSnapshots(null);
2275  }
2276
2277  @Override
2278  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2279    Preconditions.checkNotNull(pattern,
2280      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2281    return getCompletedSnapshots(pattern);
2282  }
2283
2284  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2285    return this.<List<SnapshotDescription>> newMasterCaller()
2286      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2287        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2288          GetCompletedSnapshotsRequest.newBuilder().build(),
2289          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2290          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2291      .call();
2292  }
2293
2294  @Override
2295  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2296    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2297      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2298    return getCompletedSnapshots(tableNamePattern, null);
2299  }
2300
2301  @Override
2302  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2303    Pattern snapshotNamePattern) {
2304    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2305      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2306    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2307      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2308    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2309  }
2310
2311  private CompletableFuture<List<SnapshotDescription>>
2312    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2313    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2314    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2315      if (err != null) {
2316        future.completeExceptionally(err);
2317        return;
2318      }
2319      if (tableNames == null || tableNames.size() <= 0) {
2320        future.complete(Collections.emptyList());
2321        return;
2322      }
2323      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2324        if (err2 != null) {
2325          future.completeExceptionally(err2);
2326          return;
2327        }
2328        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2329          future.complete(Collections.emptyList());
2330          return;
2331        }
2332        future.complete(snapshotDescList.stream()
2333          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2334          .collect(Collectors.toList()));
2335      });
2336    });
2337    return future;
2338  }
2339
2340  @Override
2341  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2342    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2343  }
2344
2345  @Override
2346  public CompletableFuture<Void> deleteSnapshots() {
2347    return internalDeleteSnapshots(null, null);
2348  }
2349
2350  @Override
2351  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2352    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2353      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2354    return internalDeleteSnapshots(null, snapshotNamePattern);
2355  }
2356
2357  @Override
2358  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2359    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2360      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2361    return internalDeleteSnapshots(tableNamePattern, null);
2362  }
2363
2364  @Override
2365  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2366    Pattern snapshotNamePattern) {
2367    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2368      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2369    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2370      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2371    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2372  }
2373
2374  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2375    Pattern snapshotNamePattern) {
2376    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2377    if (tableNamePattern == null) {
2378      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2379    } else {
2380      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2381    }
2382    CompletableFuture<Void> future = new CompletableFuture<>();
2383    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2384      if (err != null) {
2385        future.completeExceptionally(err);
2386        return;
2387      }
2388      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2389        future.complete(null);
2390        return;
2391      }
2392      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2393        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2394          if (e != null) {
2395            future.completeExceptionally(e);
2396          } else {
2397            future.complete(v);
2398          }
2399        });
2400    });
2401    return future;
2402  }
2403
2404  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2405    return this.<Void> newMasterCaller()
2406      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2407        controller, stub,
2408        DeleteSnapshotRequest.newBuilder()
2409          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2410        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2411      .call();
2412  }
2413
2414  @Override
2415  public CompletableFuture<Void> execProcedure(String signature, String instance,
2416    Map<String, String> props) {
2417    CompletableFuture<Void> future = new CompletableFuture<>();
2418    ProcedureDescription procDesc =
2419      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2420    addListener(this.<Long> newMasterCaller()
2421      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2422        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2423        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2424      .call(), (expectedTimeout, err) -> {
2425        if (err != null) {
2426          future.completeExceptionally(err);
2427          return;
2428        }
2429        TimerTask pollingTask = new TimerTask() {
2430          int tries = 0;
2431          long startTime = EnvironmentEdgeManager.currentTime();
2432          long endTime = startTime + expectedTimeout;
2433          long maxPauseTime = expectedTimeout / maxAttempts;
2434
2435          @Override
2436          public void run(Timeout timeout) throws Exception {
2437            if (EnvironmentEdgeManager.currentTime() < endTime) {
2438              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2439                if (err2 != null) {
2440                  future.completeExceptionally(err2);
2441                  return;
2442                }
2443                if (done) {
2444                  future.complete(null);
2445                } else {
2446                  // retry again after pauseTime.
2447                  long pauseTime =
2448                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2449                  pauseTime = Math.min(pauseTime, maxPauseTime);
2450                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2451                    TimeUnit.MICROSECONDS);
2452                }
2453              });
2454            } else {
2455              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2456                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2457            }
2458          }
2459        };
2460        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2461        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2462      });
2463    return future;
2464  }
2465
2466  @Override
2467  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2468    Map<String, String> props) {
2469    ProcedureDescription proDesc =
2470      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2471    return this.<byte[]> newMasterCaller()
2472      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2473        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2474        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2475        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2476      .call();
2477  }
2478
2479  @Override
2480  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2481    Map<String, String> props) {
2482    ProcedureDescription proDesc =
2483      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2484    return this.<Boolean> newMasterCaller()
2485      .action(
2486        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2487          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2488          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2489      .call();
2490  }
2491
2492  @Override
2493  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2494    return this.<Boolean> newMasterCaller().action(
2495      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2496        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2497        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2498      .call();
2499  }
2500
2501  @Override
2502  public CompletableFuture<String> getProcedures() {
2503    return this.<String> newMasterCaller()
2504      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2505        controller, stub, GetProceduresRequest.newBuilder().build(),
2506        (s, c, req, done) -> s.getProcedures(c, req, done),
2507        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2508      .call();
2509  }
2510
2511  @Override
2512  public CompletableFuture<String> getLocks() {
2513    return this.<String> newMasterCaller()
2514      .action(
2515        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2516          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2517          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2518      .call();
2519  }
2520
2521  @Override
2522  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2523    boolean offload) {
2524    return this.<Void> newMasterCaller()
2525      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2526        DecommissionRegionServersResponse, Void> call(controller, stub,
2527          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2528          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2529      .call();
2530  }
2531
2532  @Override
2533  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2534    return this.<List<ServerName>> newMasterCaller()
2535      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2536        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2537          ListDecommissionedRegionServersRequest.newBuilder().build(),
2538          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2539          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2540            .collect(Collectors.toList())))
2541      .call();
2542  }
2543
2544  @Override
2545  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2546    List<byte[]> encodedRegionNames) {
2547    return this.<Void> newMasterCaller()
2548      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2549        RecommissionRegionServerResponse, Void> call(controller, stub,
2550          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2551          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2552      .call();
2553  }
2554
2555  /**
2556   * Get the region location for the passed region name. The region name may be a full region name
2557   * or encoded region name. If the region does not found, then it'll throw an
2558   * UnknownRegionException wrapped by a {@link CompletableFuture}
2559   * @param regionNameOrEncodedRegionName region name or encoded region name
2560   * @return region location, wrapped by a {@link CompletableFuture}
2561   */
2562  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2563    if (regionNameOrEncodedRegionName == null) {
2564      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2565    }
2566
2567    CompletableFuture<Optional<HRegionLocation>> future;
2568    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2569      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2570      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2571        // old format encodedName, should be meta region
2572        future = connection.registry.getMetaRegionLocations()
2573          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2574            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2575      } else {
2576        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2577          regionNameOrEncodedRegionName);
2578      }
2579    } else {
2580      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2581      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2582      RegionInfo regionInfo;
2583      try {
2584        regionInfo =
2585          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2586      } catch (IOException ioe) {
2587        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2588      }
2589
2590      if (regionInfo.isMetaRegion()) {
2591        future = connection.registry.getMetaRegionLocations()
2592          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2593            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2594            .findFirst());
2595      } else {
2596        future =
2597          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2598      }
2599    }
2600
2601    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2602    addListener(future, (location, err) -> {
2603      if (err != null) {
2604        returnedFuture.completeExceptionally(err);
2605        return;
2606      }
2607      if (!location.isPresent() || location.get().getRegion() == null) {
2608        returnedFuture.completeExceptionally(
2609          new UnknownRegionException("Invalid region name or encoded region name: "
2610            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2611      } else {
2612        returnedFuture.complete(location.get());
2613      }
2614    });
2615    return returnedFuture;
2616  }
2617
2618  /**
2619   * Get the region info for the passed region name. The region name may be a full region name or
2620   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2621   * wrapped by a {@link CompletableFuture}
2622   * @return region info, wrapped by a {@link CompletableFuture}
2623   */
2624  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2625    if (regionNameOrEncodedRegionName == null) {
2626      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2627    }
2628
2629    if (
2630      Bytes.equals(regionNameOrEncodedRegionName,
2631        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2632        || Bytes.equals(regionNameOrEncodedRegionName,
2633          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2634    ) {
2635      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2636    }
2637
2638    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2639    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2640      if (err != null) {
2641        future.completeExceptionally(err);
2642      } else {
2643        future.complete(location.getRegion());
2644      }
2645    });
2646    return future;
2647  }
2648
2649  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2650    if (numRegions < 3) {
2651      throw new IllegalArgumentException("Must create at least three regions");
2652    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2653      throw new IllegalArgumentException("Start key must be smaller than end key");
2654    }
2655    if (numRegions == 3) {
2656      return new byte[][] { startKey, endKey };
2657    }
2658    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2659    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2660      throw new IllegalArgumentException("Unable to split key range into enough regions");
2661    }
2662    return splitKeys;
2663  }
2664
2665  private void verifySplitKeys(byte[][] splitKeys) {
2666    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2667    // Verify there are no duplicate split keys
2668    byte[] lastKey = null;
2669    for (byte[] splitKey : splitKeys) {
2670      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2671        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2672      }
2673      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2674        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2675          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2676      }
2677      lastKey = splitKey;
2678    }
2679  }
2680
2681  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2682
2683    abstract void onFinished();
2684
2685    abstract void onError(Throwable error);
2686
2687    @Override
2688    public void accept(Void v, Throwable error) {
2689      if (error != null) {
2690        onError(error);
2691        return;
2692      }
2693      onFinished();
2694    }
2695  }
2696
2697  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2698    protected final TableName tableName;
2699
2700    TableProcedureBiConsumer(TableName tableName) {
2701      this.tableName = tableName;
2702    }
2703
2704    abstract String getOperationType();
2705
2706    String getDescription() {
2707      return "Operation: " + getOperationType() + ", " + "Table Name: "
2708        + tableName.getNameWithNamespaceInclAsString();
2709    }
2710
2711    @Override
2712    void onFinished() {
2713      LOG.info(getDescription() + " completed");
2714    }
2715
2716    @Override
2717    void onError(Throwable error) {
2718      LOG.info(getDescription() + " failed with " + error.getMessage());
2719    }
2720  }
2721
2722  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2723    protected final String namespaceName;
2724
2725    NamespaceProcedureBiConsumer(String namespaceName) {
2726      this.namespaceName = namespaceName;
2727    }
2728
2729    abstract String getOperationType();
2730
2731    String getDescription() {
2732      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2733    }
2734
2735    @Override
2736    void onFinished() {
2737      LOG.info(getDescription() + " completed");
2738    }
2739
2740    @Override
2741    void onError(Throwable error) {
2742      LOG.info(getDescription() + " failed with " + error.getMessage());
2743    }
2744  }
2745
2746  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2747
2748    CreateTableProcedureBiConsumer(TableName tableName) {
2749      super(tableName);
2750    }
2751
2752    @Override
2753    String getOperationType() {
2754      return "CREATE";
2755    }
2756  }
2757
2758  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2759
2760    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2761      super(tableName);
2762    }
2763
2764    @Override
2765    String getOperationType() {
2766      return "MODIFY";
2767    }
2768  }
2769
2770  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2771    extends TableProcedureBiConsumer {
2772
2773    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2774      super(tableName);
2775    }
2776
2777    @Override
2778    String getOperationType() {
2779      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2780    }
2781  }
2782
2783  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2784
2785    DeleteTableProcedureBiConsumer(TableName tableName) {
2786      super(tableName);
2787    }
2788
2789    @Override
2790    String getOperationType() {
2791      return "DELETE";
2792    }
2793
2794    @Override
2795    void onFinished() {
2796      connection.getLocator().clearCache(this.tableName);
2797      super.onFinished();
2798    }
2799  }
2800
2801  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2802
2803    TruncateTableProcedureBiConsumer(TableName tableName) {
2804      super(tableName);
2805    }
2806
2807    @Override
2808    String getOperationType() {
2809      return "TRUNCATE";
2810    }
2811  }
2812
2813  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2814
2815    EnableTableProcedureBiConsumer(TableName tableName) {
2816      super(tableName);
2817    }
2818
2819    @Override
2820    String getOperationType() {
2821      return "ENABLE";
2822    }
2823  }
2824
2825  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2826
2827    DisableTableProcedureBiConsumer(TableName tableName) {
2828      super(tableName);
2829    }
2830
2831    @Override
2832    String getOperationType() {
2833      return "DISABLE";
2834    }
2835  }
2836
2837  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2838
2839    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2840      super(tableName);
2841    }
2842
2843    @Override
2844    String getOperationType() {
2845      return "ADD_COLUMN_FAMILY";
2846    }
2847  }
2848
2849  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2850
2851    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2852      super(tableName);
2853    }
2854
2855    @Override
2856    String getOperationType() {
2857      return "DELETE_COLUMN_FAMILY";
2858    }
2859  }
2860
2861  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2862
2863    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2864      super(tableName);
2865    }
2866
2867    @Override
2868    String getOperationType() {
2869      return "MODIFY_COLUMN_FAMILY";
2870    }
2871  }
2872
2873  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2874    extends TableProcedureBiConsumer {
2875
2876    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2877      super(tableName);
2878    }
2879
2880    @Override
2881    String getOperationType() {
2882      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2883    }
2884  }
2885
2886  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2887
2888    FlushTableProcedureBiConsumer(TableName tableName) {
2889      super(tableName);
2890    }
2891
2892    @Override
2893    String getOperationType() {
2894      return "FLUSH";
2895    }
2896  }
2897
2898  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2899
2900    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2901      super(namespaceName);
2902    }
2903
2904    @Override
2905    String getOperationType() {
2906      return "CREATE_NAMESPACE";
2907    }
2908  }
2909
2910  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2911
2912    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2913      super(namespaceName);
2914    }
2915
2916    @Override
2917    String getOperationType() {
2918      return "DELETE_NAMESPACE";
2919    }
2920  }
2921
2922  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2923
2924    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2925      super(namespaceName);
2926    }
2927
2928    @Override
2929    String getOperationType() {
2930      return "MODIFY_NAMESPACE";
2931    }
2932  }
2933
2934  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2935
2936    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2937      super(tableName);
2938    }
2939
2940    @Override
2941    String getOperationType() {
2942      return "MERGE_REGIONS";
2943    }
2944  }
2945
2946  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2947
2948    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2949      super(tableName);
2950    }
2951
2952    @Override
2953    String getOperationType() {
2954      return "SPLIT_REGION";
2955    }
2956  }
2957
2958  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2959
2960    TruncateRegionProcedureBiConsumer(TableName tableName) {
2961      super(tableName);
2962    }
2963
2964    @Override
2965    String getOperationType() {
2966      return "TRUNCATE_REGION";
2967    }
2968  }
2969
2970  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2971    SnapshotProcedureBiConsumer(TableName tableName) {
2972      super(tableName);
2973    }
2974
2975    @Override
2976    String getOperationType() {
2977      return "SNAPSHOT";
2978    }
2979  }
2980
2981  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2982    private final String peerId;
2983    private final Supplier<String> getOperation;
2984
2985    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2986      this.peerId = peerId;
2987      this.getOperation = getOperation;
2988    }
2989
2990    String getDescription() {
2991      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2992    }
2993
2994    @Override
2995    void onFinished() {
2996      LOG.info(getDescription() + " completed");
2997    }
2998
2999    @Override
3000    void onError(Throwable error) {
3001      LOG.info(getDescription() + " failed with " + error.getMessage());
3002    }
3003  }
3004
3005  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
3006    CompletableFuture<Void> future = new CompletableFuture<>();
3007    addListener(procFuture, (procId, error) -> {
3008      if (error != null) {
3009        future.completeExceptionally(error);
3010        return;
3011      }
3012      getProcedureResult(procId, future, 0);
3013    });
3014    return future;
3015  }
3016
3017  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
3018    addListener(
3019      this.<GetProcedureResultResponse> newMasterCaller()
3020        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3021          GetProcedureResultResponse> call(controller, stub,
3022            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3023            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3024        .call(),
3025      (response, error) -> {
3026        if (error != null) {
3027          LOG.warn("failed to get the procedure result procId={}", procId,
3028            ConnectionUtils.translateException(error));
3029          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3030            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3031          return;
3032        }
3033        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3034          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3035            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3036          return;
3037        }
3038        if (response.hasException()) {
3039          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3040          future.completeExceptionally(ioe);
3041        } else {
3042          future.complete(null);
3043        }
3044      });
3045  }
3046
3047  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3048    CompletableFuture<T> future = new CompletableFuture<>();
3049    future.completeExceptionally(error);
3050    return future;
3051  }
3052
3053  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3054    if (error != null) {
3055      future.completeExceptionally(error);
3056      return true;
3057    }
3058    return false;
3059  }
3060
3061  @Override
3062  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3063    return getClusterMetrics(EnumSet.allOf(Option.class));
3064  }
3065
3066  @Override
3067  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3068    return this.<ClusterMetrics> newMasterCaller()
3069      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3070        ClusterMetrics> call(controller, stub,
3071          RequestConverter.buildGetClusterStatusRequest(options),
3072          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3073          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3074      .call();
3075  }
3076
3077  @Override
3078  public CompletableFuture<Void> shutdown() {
3079    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3080      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3081        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3082        resp -> null))
3083      .call();
3084  }
3085
3086  @Override
3087  public CompletableFuture<Void> stopMaster() {
3088    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3089      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3090        controller, stub, StopMasterRequest.newBuilder().build(),
3091        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3092      .call();
3093  }
3094
3095  @Override
3096  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3097    StopServerRequest request = RequestConverter
3098      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3099    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3100      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3101        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3102        resp -> null))
3103      .serverName(serverName).call();
3104  }
3105
3106  @Override
3107  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3108    return this.<Void> newAdminCaller()
3109      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3110        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3111          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3112      .serverName(serverName).call();
3113  }
3114
3115  @Override
3116  public CompletableFuture<Void> updateConfiguration() {
3117    CompletableFuture<Void> future = new CompletableFuture<Void>();
3118    addListener(
3119      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3120      (status, err) -> {
3121        if (err != null) {
3122          future.completeExceptionally(err);
3123        } else {
3124          List<CompletableFuture<Void>> futures = new ArrayList<>();
3125          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3126          futures.add(updateConfiguration(status.getMasterName()));
3127          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3128          addListener(
3129            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3130            (result, err2) -> {
3131              if (err2 != null) {
3132                future.completeExceptionally(err2);
3133              } else {
3134                future.complete(result);
3135              }
3136            });
3137        }
3138      });
3139    return future;
3140  }
3141
3142  @Override
3143  public CompletableFuture<Void> updateConfiguration(String groupName) {
3144    CompletableFuture<Void> future = new CompletableFuture<Void>();
3145    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3146      if (err != null) {
3147        future.completeExceptionally(err);
3148      } else if (rsGroupInfo == null) {
3149        future.completeExceptionally(
3150          new IllegalArgumentException("Group does not exist: " + groupName));
3151      } else {
3152        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3153          if (err2 != null) {
3154            future.completeExceptionally(err2);
3155          } else {
3156            List<CompletableFuture<Void>> futures = new ArrayList<>();
3157            List<ServerName> groupServers = status.getServersName().stream()
3158              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3159            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3160            addListener(
3161              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3162              (result, err3) -> {
3163                if (err3 != null) {
3164                  future.completeExceptionally(err3);
3165                } else {
3166                  future.complete(result);
3167                }
3168              });
3169          }
3170        });
3171      }
3172    });
3173    return future;
3174  }
3175
3176  @Override
3177  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3178    return this.<Void> newAdminCaller()
3179      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3180        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3181          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3182      .serverName(serverName).call();
3183  }
3184
3185  @Override
3186  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3187    return this.<Void> newAdminCaller()
3188      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3189        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3190          RequestConverter.buildClearCompactionQueuesRequest(queues),
3191          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3192      .serverName(serverName).call();
3193  }
3194
3195  @Override
3196  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3197    return this.<List<SecurityCapability>> newMasterCaller()
3198      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3199        List<SecurityCapability>> call(controller, stub,
3200          SecurityCapabilitiesRequest.newBuilder().build(),
3201          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3202          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3203      .call();
3204  }
3205
3206  @Override
3207  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3208    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3209  }
3210
3211  @Override
3212  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3213    TableName tableName) {
3214    Preconditions.checkNotNull(tableName,
3215      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3216    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3217  }
3218
3219  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3220    ServerName serverName) {
3221    return this.<List<RegionMetrics>> newAdminCaller()
3222      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3223        List<RegionMetrics>> adminCall(controller, stub, request,
3224          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3225          RegionMetricsBuilder::toRegionMetrics))
3226      .serverName(serverName).call();
3227  }
3228
3229  @Override
3230  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3231    return this.<Boolean> newMasterCaller()
3232      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3233        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3234          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3235          resp -> resp.getInMaintenanceMode()))
3236      .call();
3237  }
3238
3239  @Override
3240  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3241    CompactType compactType) {
3242    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3243
3244    switch (compactType) {
3245      case MOB:
3246        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3247          if (err != null) {
3248            future.completeExceptionally(err);
3249            return;
3250          }
3251          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3252
3253          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3254            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3255              GetRegionInfoResponse> adminCall(controller, stub,
3256                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3257                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3258            .call(), (resp2, err2) -> {
3259              if (err2 != null) {
3260                future.completeExceptionally(err2);
3261              } else {
3262                if (resp2.hasCompactionState()) {
3263                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3264                } else {
3265                  future.complete(CompactionState.NONE);
3266                }
3267              }
3268            });
3269        });
3270        break;
3271      case NORMAL:
3272        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3273          if (err != null) {
3274            future.completeExceptionally(err);
3275            return;
3276          }
3277          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3278          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3279          locations.stream().filter(loc -> loc.getServerName() != null)
3280            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3281            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3282              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3283                // If any region compaction state is MAJOR_AND_MINOR
3284                // the table compaction state is MAJOR_AND_MINOR, too.
3285                if (err2 != null) {
3286                  future.completeExceptionally(unwrapCompletionException(err2));
3287                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3288                  future.complete(regionState);
3289                } else {
3290                  regionStates.add(regionState);
3291                }
3292              }));
3293            });
3294          addListener(
3295            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3296            (ret, err3) -> {
3297              // If future not completed, check all regions's compaction state
3298              if (!future.isCompletedExceptionally() && !future.isDone()) {
3299                CompactionState state = CompactionState.NONE;
3300                for (CompactionState regionState : regionStates) {
3301                  switch (regionState) {
3302                    case MAJOR:
3303                      if (state == CompactionState.MINOR) {
3304                        future.complete(CompactionState.MAJOR_AND_MINOR);
3305                      } else {
3306                        state = CompactionState.MAJOR;
3307                      }
3308                      break;
3309                    case MINOR:
3310                      if (state == CompactionState.MAJOR) {
3311                        future.complete(CompactionState.MAJOR_AND_MINOR);
3312                      } else {
3313                        state = CompactionState.MINOR;
3314                      }
3315                      break;
3316                    case NONE:
3317                    default:
3318                  }
3319                }
3320                if (!future.isDone()) {
3321                  future.complete(state);
3322                }
3323              }
3324            });
3325        });
3326        break;
3327      default:
3328        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3329    }
3330
3331    return future;
3332  }
3333
3334  @Override
3335  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3336    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3337    addListener(getRegionLocation(regionName), (location, err) -> {
3338      if (err != null) {
3339        future.completeExceptionally(err);
3340        return;
3341      }
3342      ServerName serverName = location.getServerName();
3343      if (serverName == null) {
3344        future
3345          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3346        return;
3347      }
3348      addListener(
3349        this.<GetRegionInfoResponse> newAdminCaller()
3350          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3351            GetRegionInfoResponse> adminCall(controller, stub,
3352              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3353                true),
3354              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3355          .serverName(serverName).call(),
3356        (resp2, err2) -> {
3357          if (err2 != null) {
3358            future.completeExceptionally(err2);
3359          } else {
3360            if (resp2.hasCompactionState()) {
3361              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3362            } else {
3363              future.complete(CompactionState.NONE);
3364            }
3365          }
3366        });
3367    });
3368    return future;
3369  }
3370
3371  @Override
3372  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3373    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3374      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3375    return this.<Optional<Long>> newMasterCaller()
3376      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3377        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3378          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3379          ProtobufUtil::toOptionalTimestamp))
3380      .call();
3381  }
3382
3383  @Override
3384  public CompletableFuture<Optional<Long>>
3385    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3386    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3387    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3388    addListener(getRegionInfo(regionName), (region, err) -> {
3389      if (err != null) {
3390        future.completeExceptionally(err);
3391        return;
3392      }
3393      MajorCompactionTimestampForRegionRequest.Builder builder =
3394        MajorCompactionTimestampForRegionRequest.newBuilder();
3395      builder.setRegion(
3396        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3397      addListener(this.<Optional<Long>> newMasterCaller()
3398        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3399          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3400            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3401            ProtobufUtil::toOptionalTimestamp))
3402        .call(), (timestamp, err2) -> {
3403          if (err2 != null) {
3404            future.completeExceptionally(err2);
3405          } else {
3406            future.complete(timestamp);
3407          }
3408        });
3409    });
3410    return future;
3411  }
3412
3413  @Override
3414  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3415    List<String> serverNamesList) {
3416    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3417    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3418      if (err != null) {
3419        future.completeExceptionally(err);
3420        return;
3421      }
3422      // Accessed by multiple threads.
3423      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3424      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3425      serverNames.stream().forEach(serverName -> {
3426        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3427          if (err2 != null) {
3428            future.completeExceptionally(unwrapCompletionException(err2));
3429          } else {
3430            serverStates.put(serverName, serverState);
3431          }
3432        }));
3433      });
3434      addListener(
3435        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3436        (ret, err3) -> {
3437          if (!future.isCompletedExceptionally()) {
3438            if (err3 != null) {
3439              future.completeExceptionally(err3);
3440            } else {
3441              future.complete(serverStates);
3442            }
3443          }
3444        });
3445    });
3446    return future;
3447  }
3448
3449  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3450    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3451    if (serverNamesList.isEmpty()) {
3452      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3453        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3454      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3455        if (err != null) {
3456          future.completeExceptionally(err);
3457        } else {
3458          future.complete(clusterMetrics.getServersName());
3459        }
3460      });
3461      return future;
3462    } else {
3463      List<ServerName> serverList = new ArrayList<>();
3464      for (String regionServerName : serverNamesList) {
3465        ServerName serverName = null;
3466        try {
3467          serverName = ServerName.valueOf(regionServerName);
3468        } catch (Exception e) {
3469          future.completeExceptionally(
3470            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3471        }
3472        if (serverName == null) {
3473          future.completeExceptionally(
3474            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3475        } else {
3476          serverList.add(serverName);
3477        }
3478      }
3479      future.complete(serverList);
3480    }
3481    return future;
3482  }
3483
3484  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3485    return this.<Boolean> newAdminCaller().serverName(serverName)
3486      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3487        Boolean> adminCall(controller, stub,
3488          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3489          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3490      .call();
3491  }
3492
3493  @Override
3494  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3495    return this.<Boolean> newMasterCaller()
3496      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3497        Boolean> call(controller, stub,
3498          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3499          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3500          (resp) -> resp.getPrevBalanceValue()))
3501      .call();
3502  }
3503
3504  @Override
3505  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3506    return this.<BalanceResponse> newMasterCaller()
3507      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3508        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3509          (s, c, req, done) -> s.balance(c, req, done),
3510          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3511      .call();
3512  }
3513
3514  @Override
3515  public CompletableFuture<Boolean> isBalancerEnabled() {
3516    return this.<Boolean> newMasterCaller()
3517      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3518        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3519          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3520      .call();
3521  }
3522
3523  @Override
3524  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3525    return this.<Boolean> newMasterCaller()
3526      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3527        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3528          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3529          (resp) -> resp.getPrevNormalizerValue()))
3530      .call();
3531  }
3532
3533  @Override
3534  public CompletableFuture<Boolean> isNormalizerEnabled() {
3535    return this.<Boolean> newMasterCaller()
3536      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3537        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3538          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3539      .call();
3540  }
3541
3542  @Override
3543  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3544    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3545  }
3546
3547  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3548    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3549      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3550  }
3551
3552  @Override
3553  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3554    return this.<Boolean> newMasterCaller()
3555      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3556        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3557          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3558          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3559          (resp) -> resp.getPrevValue()))
3560      .call();
3561  }
3562
3563  @Override
3564  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3565    return this.<Boolean> newMasterCaller()
3566      .action(
3567        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3568          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3569            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3570      .call();
3571  }
3572
3573  @Override
3574  public CompletableFuture<Boolean> runCleanerChore() {
3575    return this.<Boolean> newMasterCaller()
3576      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3577        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3578          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3579          (resp) -> resp.getCleanerChoreRan()))
3580      .call();
3581  }
3582
3583  @Override
3584  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3585    return this.<Boolean> newMasterCaller()
3586      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3587        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3588          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3589      .call();
3590  }
3591
3592  @Override
3593  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3594    return this.<Boolean> newMasterCaller()
3595      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3596        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3597          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3598          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3599      .call();
3600  }
3601
3602  @Override
3603  public CompletableFuture<Integer> runCatalogJanitor() {
3604    return this.<Integer> newMasterCaller()
3605      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3606        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3607          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3608      .call();
3609  }
3610
3611  @Override
3612  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3613    ServiceCaller<S, R> callable) {
3614    MasterCoprocessorRpcChannelImpl channel =
3615      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3616    S stub = stubMaker.apply(channel);
3617    CompletableFuture<R> future = new CompletableFuture<>();
3618    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3619    callable.call(stub, controller, resp -> {
3620      if (controller.failed()) {
3621        future.completeExceptionally(controller.getFailed());
3622      } else {
3623        future.complete(resp);
3624      }
3625    });
3626    return future;
3627  }
3628
3629  @Override
3630  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3631    ServiceCaller<S, R> callable, ServerName serverName) {
3632    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3633      this.<Message> newServerCaller().serverName(serverName));
3634    S stub = stubMaker.apply(channel);
3635    CompletableFuture<R> future = new CompletableFuture<>();
3636    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3637    callable.call(stub, controller, resp -> {
3638      if (controller.failed()) {
3639        future.completeExceptionally(controller.getFailed());
3640      } else {
3641        future.complete(resp);
3642      }
3643    });
3644    return future;
3645  }
3646
3647  @Override
3648  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3649    return this.<List<ServerName>> newMasterCaller()
3650      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3651        List<ServerName>> call(controller, stub,
3652          RequestConverter.buildClearDeadServersRequest(servers),
3653          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3654          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3655      .call();
3656  }
3657
3658  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3659    return this.connection.callerFactory.<T> serverRequest()
3660      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3661      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3662      .pause(pauseNs, TimeUnit.NANOSECONDS)
3663      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3664      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3665  }
3666
3667  @Override
3668  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3669    if (tableName == null) {
3670      return failedFuture(new IllegalArgumentException("Table name is null"));
3671    }
3672    CompletableFuture<Void> future = new CompletableFuture<>();
3673    addListener(tableExists(tableName), (exist, err) -> {
3674      if (err != null) {
3675        future.completeExceptionally(err);
3676        return;
3677      }
3678      if (!exist) {
3679        future.completeExceptionally(new TableNotFoundException(
3680          "Table '" + tableName.getNameAsString() + "' does not exists."));
3681        return;
3682      }
3683      addListener(getTableSplits(tableName), (splits, err1) -> {
3684        if (err1 != null) {
3685          future.completeExceptionally(err1);
3686        } else {
3687          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3688            if (err2 != null) {
3689              future.completeExceptionally(err2);
3690            } else {
3691              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3692                if (err3 != null) {
3693                  future.completeExceptionally(err3);
3694                } else {
3695                  future.complete(result3);
3696                }
3697              });
3698            }
3699          });
3700        }
3701      });
3702    });
3703    return future;
3704  }
3705
3706  @Override
3707  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3708    if (tableName == null) {
3709      return failedFuture(new IllegalArgumentException("Table name is null"));
3710    }
3711    CompletableFuture<Void> future = new CompletableFuture<>();
3712    addListener(tableExists(tableName), (exist, err) -> {
3713      if (err != null) {
3714        future.completeExceptionally(err);
3715        return;
3716      }
3717      if (!exist) {
3718        future.completeExceptionally(new TableNotFoundException(
3719          "Table '" + tableName.getNameAsString() + "' does not exists."));
3720        return;
3721      }
3722      addListener(setTableReplication(tableName, false), (result, err2) -> {
3723        if (err2 != null) {
3724          future.completeExceptionally(err2);
3725        } else {
3726          future.complete(result);
3727        }
3728      });
3729    });
3730    return future;
3731  }
3732
3733  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3734    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3735    addListener(
3736      getRegions(tableName).thenApply(regions -> regions.stream()
3737        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3738      (regions, err2) -> {
3739        if (err2 != null) {
3740          future.completeExceptionally(err2);
3741          return;
3742        }
3743        if (regions.size() == 1) {
3744          future.complete(null);
3745        } else {
3746          byte[][] splits = new byte[regions.size() - 1][];
3747          for (int i = 1; i < regions.size(); i++) {
3748            splits[i - 1] = regions.get(i).getStartKey();
3749          }
3750          future.complete(splits);
3751        }
3752      });
3753    return future;
3754  }
3755
3756  /**
3757   * Connect to peer and check the table descriptor on peer:
3758   * <ol>
3759   * <li>Create the same table on peer when not exist.</li>
3760   * <li>Throw an exception if the table already has replication enabled on any of the column
3761   * families.</li>
3762   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3763   * </ol>
3764   * @param tableName name of the table to sync to the peer
3765   * @param splits    table split keys
3766   */
3767  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3768    byte[][] splits) {
3769    CompletableFuture<Void> future = new CompletableFuture<>();
3770    addListener(listReplicationPeers(), (peers, err) -> {
3771      if (err != null) {
3772        future.completeExceptionally(err);
3773        return;
3774      }
3775      if (peers == null || peers.size() <= 0) {
3776        future.completeExceptionally(
3777          new IllegalArgumentException("Found no peer cluster for replication."));
3778        return;
3779      }
3780      List<CompletableFuture<Void>> futures = new ArrayList<>();
3781      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3782        .forEach(peer -> {
3783          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3784        });
3785      addListener(
3786        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3787        (result, err2) -> {
3788          if (err2 != null) {
3789            future.completeExceptionally(err2);
3790          } else {
3791            future.complete(result);
3792          }
3793        });
3794    });
3795    return future;
3796  }
3797
3798  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3799    ReplicationPeerDescription peer) {
3800    Configuration peerConf;
3801    try {
3802      peerConf = ReplicationPeerConfigUtil
3803        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3804    } catch (IOException e) {
3805      return failedFuture(e);
3806    }
3807    URI connectionUri =
3808      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3809    CompletableFuture<Void> future = new CompletableFuture<>();
3810    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3811      if (err != null) {
3812        future.completeExceptionally(err);
3813        return;
3814      }
3815      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3816        if (err1 != null) {
3817          future.completeExceptionally(err1);
3818          return;
3819        }
3820        AsyncAdmin peerAdmin = conn.getAdmin();
3821        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3822          if (err2 != null) {
3823            future.completeExceptionally(err2);
3824            return;
3825          }
3826          if (!exist) {
3827            CompletableFuture<Void> createTableFuture = null;
3828            if (splits == null) {
3829              createTableFuture = peerAdmin.createTable(tableDesc);
3830            } else {
3831              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3832            }
3833            addListener(createTableFuture, (result, err3) -> {
3834              if (err3 != null) {
3835                future.completeExceptionally(err3);
3836              } else {
3837                future.complete(result);
3838              }
3839            });
3840          } else {
3841            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3842              (result, err4) -> {
3843                if (err4 != null) {
3844                  future.completeExceptionally(err4);
3845                } else {
3846                  future.complete(result);
3847                }
3848              });
3849          }
3850        });
3851      });
3852    });
3853    return future;
3854  }
3855
3856  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3857    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3858    CompletableFuture<Void> future = new CompletableFuture<>();
3859    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3860      if (err != null) {
3861        future.completeExceptionally(err);
3862        return;
3863      }
3864      if (peerTableDesc == null) {
3865        future.completeExceptionally(
3866          new IllegalArgumentException("Failed to get table descriptor for table "
3867            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3868        return;
3869      }
3870      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3871        future.completeExceptionally(new IllegalArgumentException(
3872          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3873            + ", but the table descriptors are not same when compared with source cluster."
3874            + " Thus can not enable the table's replication switch."));
3875        return;
3876      }
3877      future.complete(null);
3878    });
3879    return future;
3880  }
3881
3882  /**
3883   * Set the table's replication switch if the table's replication switch is already not set.
3884   * @param tableName name of the table
3885   * @param enableRep is replication switch enable or disable
3886   */
3887  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3888    CompletableFuture<Void> future = new CompletableFuture<>();
3889    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3890      if (err != null) {
3891        future.completeExceptionally(err);
3892        return;
3893      }
3894      if (!tableDesc.matchReplicationScope(enableRep)) {
3895        int scope =
3896          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3897        TableDescriptor newTableDesc =
3898          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3899        addListener(modifyTable(newTableDesc), (result, err2) -> {
3900          if (err2 != null) {
3901            future.completeExceptionally(err2);
3902          } else {
3903            future.complete(result);
3904          }
3905        });
3906      } else {
3907        future.complete(null);
3908      }
3909    });
3910    return future;
3911  }
3912
3913  @Override
3914  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3915    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3916    request.setPeerId(peerId);
3917    return this.<Boolean> newMasterCaller()
3918      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3919        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3920          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3921          resp -> resp.getIsEnabled()))
3922      .call();
3923  }
3924
3925  private void waitUntilAllReplicationPeerModificationProceduresDone(
3926    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
3927    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
3928      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
3929        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
3930          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
3931            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
3932            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
3933            resp -> resp.getProcedureList()))
3934        .call();
3935    addListener(callFuture, (r, e) -> {
3936      if (e != null) {
3937        future.completeExceptionally(e);
3938      } else if (r.isEmpty()) {
3939        // we are done
3940        future.complete(prevOn);
3941      } else {
3942        // retry later to see if the procedures are done
3943        retryTimer.newTimeout(
3944          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
3945          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3946      }
3947    });
3948  }
3949
3950  @Override
3951  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
3952    boolean drainProcedures) {
3953    ReplicationPeerModificationSwitchRequest request =
3954      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
3955    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
3956      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
3957        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
3958          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
3959          resp -> resp.getPreviousValue()))
3960      .call();
3961    // if we do not need to wait all previous peer modification procedure done, or we are enabling
3962    // peer modification, just return here.
3963    if (!drainProcedures || on) {
3964      return callFuture;
3965    }
3966    // otherwise we need to wait until all previous peer modification procedure done
3967    CompletableFuture<Boolean> future = new CompletableFuture<>();
3968    addListener(callFuture, (prevOn, err) -> {
3969      if (err != null) {
3970        future.completeExceptionally(err);
3971        return;
3972      }
3973      // even if the previous state is disabled, we still need to wait here, as there could be
3974      // another client thread which called this method just before us and have already changed the
3975      // state to off, but there are still peer modification procedures not finished, so we should
3976      // also wait here.
3977      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
3978    });
3979    return future;
3980  }
3981
3982  @Override
3983  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
3984    return this.<Boolean> newMasterCaller()
3985      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
3986        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
3987          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
3988          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
3989          (resp) -> resp.getEnabled()))
3990      .call();
3991  }
3992
3993  @Override
3994  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3995    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3996    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3997      if (err != null) {
3998        future.completeExceptionally(err);
3999        return;
4000      }
4001      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4002        locations.stream().filter(l -> l.getRegion() != null)
4003          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4004          .collect(Collectors.groupingBy(l -> l.getServerName(),
4005            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4006      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4007      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4008      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4009        futures
4010          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4011            if (err2 != null) {
4012              future.completeExceptionally(unwrapCompletionException(err2));
4013            } else {
4014              aggregator.append(stats);
4015            }
4016          }));
4017      }
4018      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4019        (ret, err3) -> {
4020          if (err3 != null) {
4021            future.completeExceptionally(unwrapCompletionException(err3));
4022          } else {
4023            future.complete(aggregator.sum());
4024          }
4025        });
4026    });
4027    return future;
4028  }
4029
4030  @Override
4031  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4032    boolean preserveSplits) {
4033    CompletableFuture<Void> future = new CompletableFuture<>();
4034    addListener(tableExists(tableName), (exist, err) -> {
4035      if (err != null) {
4036        future.completeExceptionally(err);
4037        return;
4038      }
4039      if (!exist) {
4040        future.completeExceptionally(new TableNotFoundException(tableName));
4041        return;
4042      }
4043      addListener(tableExists(newTableName), (exist1, err1) -> {
4044        if (err1 != null) {
4045          future.completeExceptionally(err1);
4046          return;
4047        }
4048        if (exist1) {
4049          future.completeExceptionally(new TableExistsException(newTableName));
4050          return;
4051        }
4052        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4053          if (err2 != null) {
4054            future.completeExceptionally(err2);
4055            return;
4056          }
4057          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4058          if (preserveSplits) {
4059            addListener(getTableSplits(tableName), (splits, err3) -> {
4060              if (err3 != null) {
4061                future.completeExceptionally(err3);
4062              } else {
4063                addListener(
4064                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4065                  (result, err4) -> {
4066                    if (err4 != null) {
4067                      future.completeExceptionally(err4);
4068                    } else {
4069                      future.complete(result);
4070                    }
4071                  });
4072              }
4073            });
4074          } else {
4075            addListener(createTable(newTableDesc), (result, err5) -> {
4076              if (err5 != null) {
4077                future.completeExceptionally(err5);
4078              } else {
4079                future.complete(result);
4080              }
4081            });
4082          }
4083        });
4084      });
4085    });
4086    return future;
4087  }
4088
4089  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4090    List<RegionInfo> hris) {
4091    return this.<CacheEvictionStats> newAdminCaller()
4092      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4093        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4094          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4095          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4096          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4097      .serverName(serverName).call();
4098  }
4099
4100  @Override
4101  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4102    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4103      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4104        Boolean> call(controller, stub,
4105          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4106          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4107          resp -> resp.getPreviousRpcThrottleEnabled()))
4108      .call();
4109    return future;
4110  }
4111
4112  @Override
4113  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4114    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4115      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4116        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4117          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4118          resp -> resp.getRpcThrottleEnabled()))
4119      .call();
4120    return future;
4121  }
4122
4123  @Override
4124  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4125    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4126      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4127        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4128          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4129            .build(),
4130          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4131          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4132      .call();
4133    return future;
4134  }
4135
4136  @Override
4137  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4138    return this.<Map<TableName, Long>> newMasterCaller()
4139      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4140        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4141          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4142          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4143          resp -> resp.getSizesList().stream().collect(Collectors
4144            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4145      .call();
4146  }
4147
4148  @Override
4149  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4150    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4151    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4152      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4153        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4154          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4155          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4156          resp -> resp.getSnapshotsList().stream()
4157            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4158              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4159      .serverName(serverName).call();
4160  }
4161
4162  private CompletableFuture<SpaceQuotaSnapshot>
4163    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4164    return this.<SpaceQuotaSnapshot> newMasterCaller()
4165      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4166        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4167          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4168      .call();
4169  }
4170
4171  @Override
4172  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4173    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4174      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4175      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4176  }
4177
4178  @Override
4179  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4180    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4181    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4182      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4183      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4184  }
4185
4186  @Override
4187  public CompletableFuture<Void> grant(UserPermission userPermission,
4188    boolean mergeExistingPermissions) {
4189    return this.<Void> newMasterCaller()
4190      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4191        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4192        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4193      .call();
4194  }
4195
4196  @Override
4197  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4198    return this.<Void> newMasterCaller()
4199      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4200        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4201        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4202      .call();
4203  }
4204
4205  @Override
4206  public CompletableFuture<List<UserPermission>>
4207    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4208    return this.<List<UserPermission>> newMasterCaller()
4209      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4210        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4211          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4212          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4213          resp -> resp.getUserPermissionList().stream()
4214            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4215            .collect(Collectors.toList())))
4216      .call();
4217  }
4218
4219  @Override
4220  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4221    List<Permission> permissions) {
4222    return this.<List<Boolean>> newMasterCaller()
4223      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4224        List<Boolean>> call(controller, stub,
4225          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4226          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4227          resp -> resp.getHasUserPermissionList()))
4228      .call();
4229  }
4230
4231  @Override
4232  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4233    return this.<Boolean> newMasterCaller()
4234      .action((controller, stub) -> this.call(controller, stub,
4235        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4236        MasterService.Interface::switchSnapshotCleanup,
4237        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4238      .call();
4239  }
4240
4241  @Override
4242  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4243    return this.<Boolean> newMasterCaller()
4244      .action((controller, stub) -> this.call(controller, stub,
4245        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4246        MasterService.Interface::isSnapshotCleanupEnabled,
4247        IsSnapshotCleanupEnabledResponse::getEnabled))
4248      .call();
4249  }
4250
4251  @Override
4252  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4253    return this.<Void> newMasterCaller()
4254      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4255        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4256        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4257      .call();
4258  }
4259
4260  @Override
4261  public CompletableFuture<Void> addRSGroup(String groupName) {
4262    return this.<Void> newMasterCaller()
4263      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4264        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4265        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4266      .call();
4267  }
4268
4269  @Override
4270  public CompletableFuture<Void> removeRSGroup(String groupName) {
4271    return this.<Void> newMasterCaller()
4272      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4273        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4274        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4275      .call();
4276  }
4277
4278  @Override
4279  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4280    BalanceRequest request) {
4281    return this.<BalanceResponse> newMasterCaller()
4282      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4283        BalanceResponse> call(controller, stub,
4284          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4285          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4286      .call();
4287  }
4288
4289  @Override
4290  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4291    return this.<List<RSGroupInfo>> newMasterCaller()
4292      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4293        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4294          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4295            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4296      .call();
4297  }
4298
4299  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4300    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4301    final String logType) {
4302    if (CollectionUtils.isEmpty(serverNames)) {
4303      return CompletableFuture.completedFuture(Collections.emptyList());
4304    }
4305    return CompletableFuture.supplyAsync(() -> serverNames
4306      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4307        filterParams, limit, logType))
4308      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4309  }
4310
4311  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4312    Map<String, Object> filterParams, int limit, String logType) {
4313    return this.<List<LogEntry>> newAdminCaller()
4314      .action((controller, stub) -> this.adminCall(controller, stub,
4315        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4316        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4317      .serverName(serverName).call();
4318  }
4319
4320  @Override
4321  public CompletableFuture<List<Boolean>>
4322    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4323    if (CollectionUtils.isEmpty(serverNames)) {
4324      return CompletableFuture.completedFuture(Collections.emptyList());
4325    }
4326    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4327      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4328    return convertToFutureOfList(clearSlowLogResponseList);
4329  }
4330
4331  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4332    return this.<Boolean> newAdminCaller()
4333      .action((controller, stub) -> this.adminCall(controller, stub,
4334        RequestConverter.buildClearSlowLogResponseRequest(),
4335        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4336      .serverName(serverName).call();
4337  }
4338
4339  private static <T> CompletableFuture<List<T>>
4340    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4341    CompletableFuture<Void> allDoneFuture =
4342      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4343    return allDoneFuture
4344      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4345  }
4346
4347  @Override
4348  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4349    return this.<List<TableName>> newMasterCaller()
4350      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4351        List<TableName>> call(controller, stub,
4352          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4353          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4354            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4355      .call();
4356  }
4357
4358  @Override
4359  public CompletableFuture<Pair<List<String>, List<TableName>>>
4360    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4361    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4362      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4363        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4364        Pair<List<String>, List<TableName>>> call(controller, stub,
4365          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4366            .build(),
4367          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4368          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4369            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4370      .call();
4371  }
4372
4373  @Override
4374  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4375    return this.<RSGroupInfo> newMasterCaller()
4376      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4377        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4378          GetRSGroupInfoOfServerRequest.newBuilder()
4379            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4380              .setPort(hostPort.getPort()).build())
4381            .build(),
4382          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4383          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4384      .call();
4385  }
4386
4387  @Override
4388  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4389    return this.<Void> newMasterCaller()
4390      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4391        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4392        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4393      .call();
4394  }
4395
4396  @Override
4397  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4398    CompletableFuture<Void> future = new CompletableFuture<>();
4399    for (TableName tableName : tables) {
4400      addListener(tableExists(tableName), (exist, err) -> {
4401        if (err != null) {
4402          future.completeExceptionally(err);
4403          return;
4404        }
4405        if (!exist) {
4406          future.completeExceptionally(new TableNotFoundException(tableName));
4407          return;
4408        }
4409      });
4410    }
4411    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4412      if (err != null) {
4413        future.completeExceptionally(err);
4414        return;
4415      }
4416      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4417        future.complete(null);
4418        return;
4419      }
4420      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4421      for (TableDescriptor td : tableDescriptions) {
4422        newTableDescriptors
4423          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4424      }
4425      addListener(
4426        CompletableFuture.allOf(
4427          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4428        (v, e) -> {
4429          if (e != null) {
4430            future.completeExceptionally(e);
4431          } else {
4432            future.complete(v);
4433          }
4434        });
4435    });
4436    return future;
4437  }
4438
4439  @Override
4440  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4441    return this.<RSGroupInfo> newMasterCaller()
4442      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4443        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4444          GetRSGroupInfoOfTableRequest.newBuilder()
4445            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4446          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4447          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4448      .call();
4449  }
4450
4451  @Override
4452  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4453    return this.<RSGroupInfo> newMasterCaller()
4454      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4455        RSGroupInfo> call(controller, stub,
4456          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4457          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4458          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4459      .call();
4460  }
4461
4462  @Override
4463  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4464    return this.<Void> newMasterCaller()
4465      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4466        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4467          .setNewRsgroupName(newName).build(),
4468        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4469      .call();
4470  }
4471
4472  @Override
4473  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4474    Map<String, String> configuration) {
4475    UpdateRSGroupConfigRequest.Builder request =
4476      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4477    if (configuration != null) {
4478      configuration.entrySet().forEach(e -> request.addConfiguration(
4479        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4480    }
4481    return this.<Void> newMasterCaller()
4482      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4483        Void> call(controller, stub, request.build(),
4484          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4485      .call();
4486  }
4487
4488  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4489    return this.<List<LogEntry>> newMasterCaller()
4490      .action((controller, stub) -> this.call(controller, stub,
4491        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4492        ProtobufUtil::toBalancerDecisionResponse))
4493      .call();
4494  }
4495
4496  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4497    return this.<List<LogEntry>> newMasterCaller()
4498      .action((controller, stub) -> this.call(controller, stub,
4499        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4500        ProtobufUtil::toBalancerRejectionResponse))
4501      .call();
4502  }
4503
4504  @Override
4505  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4506    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4507    if (logType == null || serverType == null) {
4508      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4509    }
4510    switch (logType) {
4511      case "SLOW_LOG":
4512      case "LARGE_LOG":
4513        if (ServerType.MASTER.equals(serverType)) {
4514          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4515        }
4516        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4517      case "BALANCER_DECISION":
4518        if (ServerType.REGION_SERVER.equals(serverType)) {
4519          throw new IllegalArgumentException(
4520            "Balancer Decision logs are not maintained by HRegionServer");
4521        }
4522        return getBalancerDecisions(limit);
4523      case "BALANCER_REJECTION":
4524        if (ServerType.REGION_SERVER.equals(serverType)) {
4525          throw new IllegalArgumentException(
4526            "Balancer Rejection logs are not maintained by HRegionServer");
4527        }
4528        return getBalancerRejections(limit);
4529      default:
4530        return CompletableFuture.completedFuture(Collections.emptyList());
4531    }
4532  }
4533
4534  @Override
4535  public CompletableFuture<Void> flushMasterStore() {
4536    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4537    return this.<Void> newMasterCaller()
4538      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4539        Void> call(controller, stub, request.build(),
4540          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4541      .call();
4542  }
4543
4544  @Override
4545  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4546    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4547    return this.<List<String>> newAdminCaller()
4548      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4549        List<String>> adminCall(controller, stub, request.build(),
4550          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4551          resp -> resp.getCachedFilesList()))
4552      .serverName(serverName).call();
4553  }
4554}