001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.replication.SyncReplicationState;
090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
092import org.apache.hadoop.hbase.security.access.Permission;
093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
094import org.apache.hadoop.hbase.security.access.UserPermission;
095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.Strings;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
108import org.apache.hbase.thirdparty.com.google.protobuf.Message;
109import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
110import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
111import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
112import org.apache.hbase.thirdparty.io.netty.util.Timeout;
113import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
115
116import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
117import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
357import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
358
359/**
360 * The implementation of AsyncAdmin.
361 * <p>
362 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
363 * be finished inside the rpc framework thread, which means that the callbacks registered to the
364 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
365 * this class should not try to do time consuming tasks in the callbacks.
366 * @since 2.0.0
367 * @see AsyncHBaseAdmin
368 * @see AsyncConnection#getAdmin()
369 * @see AsyncConnection#getAdminBuilder()
370 */
371@InterfaceAudience.Private
372class RawAsyncHBaseAdmin implements AsyncAdmin {
373
374  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
375
376  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
377
378  private final AsyncConnectionImpl connection;
379
380  private final HashedWheelTimer retryTimer;
381
382  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
383
384  private final long rpcTimeoutNs;
385
386  private final long operationTimeoutNs;
387
388  private final long pauseNs;
389
390  private final long pauseNsForServerOverloaded;
391
392  private final int maxAttempts;
393
394  private final int startLogErrorsCnt;
395
396  private final NonceGenerator ng;
397
398  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
399    AsyncAdminBuilderBase builder) {
400    this.connection = connection;
401    this.retryTimer = retryTimer;
402    this.metaTable = connection.getTable(META_TABLE_NAME);
403    this.rpcTimeoutNs = builder.rpcTimeoutNs;
404    this.operationTimeoutNs = builder.operationTimeoutNs;
405    this.pauseNs = builder.pauseNs;
406    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
407      LOG.warn(
408        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
409          + " the normal pause value {} ms, use the greater one instead",
410        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
411        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
412      this.pauseNsForServerOverloaded = builder.pauseNs;
413    } else {
414      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
415    }
416    this.maxAttempts = builder.maxAttempts;
417    this.startLogErrorsCnt = builder.startLogErrorsCnt;
418    this.ng = connection.getNonceGenerator();
419  }
420
421  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
422    return this.connection.callerFactory.<T> masterRequest()
423      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
424      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
425      .pause(pauseNs, TimeUnit.NANOSECONDS)
426      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
427      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
428  }
429
430  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
431    return this.connection.callerFactory.<T> adminRequest()
432      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
433      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
434      .pause(pauseNs, TimeUnit.NANOSECONDS)
435      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
436      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
437  }
438
439  @FunctionalInterface
440  private interface MasterRpcCall<RESP, REQ> {
441    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
442      RpcCallback<RESP> done);
443  }
444
445  @FunctionalInterface
446  private interface AdminRpcCall<RESP, REQ> {
447    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
448      RpcCallback<RESP> done);
449  }
450
451  @FunctionalInterface
452  private interface Converter<D, S> {
453    D convert(S src) throws IOException;
454  }
455
456  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
457    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
458    Converter<RESP, PRESP> respConverter) {
459    CompletableFuture<RESP> future = new CompletableFuture<>();
460    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
461
462      @Override
463      public void run(PRESP resp) {
464        if (controller.failed()) {
465          future.completeExceptionally(controller.getFailed());
466        } else {
467          try {
468            future.complete(respConverter.convert(resp));
469          } catch (IOException e) {
470            future.completeExceptionally(e);
471          }
472        }
473      }
474    });
475    return future;
476  }
477
478  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
479    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
480    Converter<RESP, PRESP> respConverter) {
481    CompletableFuture<RESP> future = new CompletableFuture<>();
482    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
483
484      @Override
485      public void run(PRESP resp) {
486        if (controller.failed()) {
487          future.completeExceptionally(controller.getFailed());
488        } else {
489          try {
490            future.complete(respConverter.convert(resp));
491          } catch (IOException e) {
492            future.completeExceptionally(e);
493          }
494        }
495      }
496    });
497    return future;
498  }
499
500  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
501    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
502    ProcedureBiConsumer consumer) {
503    return procedureCall(b -> {
504    }, preq, rpcCall, respConverter, consumer);
505  }
506
507  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
508    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
509    ProcedureBiConsumer consumer) {
510    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
511  }
512
513  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
514    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
515    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
516    ProcedureBiConsumer consumer) {
517    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
518      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
519    prioritySetter.accept(builder);
520    CompletableFuture<Long> procFuture = builder.call();
521    CompletableFuture<Void> future = waitProcedureResult(procFuture);
522    addListener(future, consumer);
523    return future;
524  }
525
526  @Override
527  public CompletableFuture<Boolean> tableExists(TableName tableName) {
528    if (TableName.isMetaTableName(tableName)) {
529      return CompletableFuture.completedFuture(true);
530    }
531    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
532  }
533
534  @Override
535  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
536    return getTableDescriptors(
537      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
538  }
539
540  /**
541   * {@link #listTableDescriptors(boolean)}
542   */
543  @Override
544  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
545    boolean includeSysTables) {
546    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
547      + "use listTableDescriptors(boolean) instead");
548    return getTableDescriptors(
549      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
550  }
551
552  @Override
553  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
554    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
555      + "use listTableDescriptors(boolean) instead");
556    if (tableNames.isEmpty()) {
557      return CompletableFuture.completedFuture(Collections.emptyList());
558    }
559    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
560  }
561
562  private CompletableFuture<List<TableDescriptor>>
563    getTableDescriptors(GetTableDescriptorsRequest request) {
564    return this.<List<TableDescriptor>> newMasterCaller()
565      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
566        List<TableDescriptor>> call(controller, stub, request,
567          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
568          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
569      .call();
570  }
571
572  @Override
573  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
574    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
575  }
576
577  @Override
578  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
579    boolean includeSysTables) {
580    Preconditions.checkNotNull(pattern,
581      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
582    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
583  }
584
585  @Override
586  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
587    return this.<List<TableName>> newMasterCaller()
588      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
589        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
590          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
591          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
592          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
593      .call();
594  }
595
596  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
597    return this.<List<TableName>> newMasterCaller()
598      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
599        List<TableName>> call(controller, stub, request,
600          (s, c, req, done) -> s.getTableNames(c, req, done),
601          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
602      .call();
603  }
604
605  @Override
606  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
607    return this.<List<TableDescriptor>> newMasterCaller()
608      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
609        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
610          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
611          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
612          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
613      .call();
614  }
615
616  @Override
617  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
618    return this.<List<TableDescriptor>> newMasterCaller()
619      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
620        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
621          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
622          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
623          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
624      .call();
625  }
626
627  @Override
628  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
629    return this.<List<TableName>> newMasterCaller()
630      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
631        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
632          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
633          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
634          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
635      .call();
636  }
637
638  @Override
639  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
640    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
641    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
642      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
643        List<TableSchema>> call(controller, stub,
644          RequestConverter.buildGetTableDescriptorsRequest(tableName),
645          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
646          (resp) -> resp.getTableSchemaList()))
647      .call(), (tableSchemas, error) -> {
648        if (error != null) {
649          future.completeExceptionally(error);
650          return;
651        }
652        if (!tableSchemas.isEmpty()) {
653          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
654        } else {
655          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
656        }
657      });
658    return future;
659  }
660
661  @Override
662  public CompletableFuture<Void> createTable(TableDescriptor desc) {
663    return createTable(desc.getTableName(),
664      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
665  }
666
667  @Override
668  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
669    int numRegions) {
670    try {
671      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
672    } catch (IllegalArgumentException e) {
673      return failedFuture(e);
674    }
675  }
676
677  @Override
678  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
679    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
680      + " use createTable(TableDescriptor) instead");
681    try {
682      verifySplitKeys(splitKeys);
683      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
684        splitKeys, ng.getNonceGroup(), ng.newNonce()));
685    } catch (IllegalArgumentException e) {
686      return failedFuture(e);
687    }
688  }
689
690  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
691    Preconditions.checkNotNull(tableName, "table name is null");
692    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
693      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
694      new CreateTableProcedureBiConsumer(tableName));
695  }
696
697  @Override
698  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
699    return modifyTable(desc, true);
700  }
701
702  @Override
703  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
704    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
705      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
706        ng.newNonce(), reopenRegions),
707      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
708      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
709  }
710
711  @Override
712  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
713    return this.<ModifyTableStoreFileTrackerRequest,
714      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
715        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
716          ng.getNonceGroup(), ng.newNonce()),
717        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
718        (resp) -> resp.getProcId(),
719        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
720  }
721
722  @Override
723  public CompletableFuture<Void> deleteTable(TableName tableName) {
724    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
725      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
726      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
727      new DeleteTableProcedureBiConsumer(tableName));
728  }
729
730  @Override
731  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
732    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
733      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
734        ng.newNonce()),
735      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
736      new TruncateTableProcedureBiConsumer(tableName));
737  }
738
739  @Override
740  public CompletableFuture<Void> enableTable(TableName tableName) {
741    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
742      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
743      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
744      new EnableTableProcedureBiConsumer(tableName));
745  }
746
747  @Override
748  public CompletableFuture<Void> disableTable(TableName tableName) {
749    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
750      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
751      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
752      new DisableTableProcedureBiConsumer(tableName));
753  }
754
755  /**
756   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
757   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
758   * targetState).
759   */
760  private static CompletableFuture<Boolean> completeCheckTableState(
761    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
762    TableState.State targetState, TableName tableName) {
763    if (error != null) {
764      future.completeExceptionally(error);
765    } else {
766      if (tableState != null) {
767        future.complete(tableState.inStates(targetState));
768      } else {
769        future.completeExceptionally(new TableNotFoundException(tableName));
770      }
771    }
772    return future;
773  }
774
775  @Override
776  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
777    if (TableName.isMetaTableName(tableName)) {
778      return CompletableFuture.completedFuture(true);
779    }
780    CompletableFuture<Boolean> future = new CompletableFuture<>();
781    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
782      (tableState, error) -> {
783        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
784          TableState.State.ENABLED, tableName);
785      });
786    return future;
787  }
788
789  @Override
790  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
791    if (TableName.isMetaTableName(tableName)) {
792      return CompletableFuture.completedFuture(false);
793    }
794    CompletableFuture<Boolean> future = new CompletableFuture<>();
795    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
796      (tableState, error) -> {
797        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
798          TableState.State.DISABLED, tableName);
799      });
800    return future;
801  }
802
803  @Override
804  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
805    if (TableName.isMetaTableName(tableName)) {
806      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
807        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
808    }
809    CompletableFuture<Boolean> future = new CompletableFuture<>();
810    addListener(isTableEnabled(tableName), (enabled, error) -> {
811      if (error != null) {
812        if (error instanceof TableNotFoundException) {
813          future.complete(false);
814        } else {
815          future.completeExceptionally(error);
816        }
817        return;
818      }
819      if (!enabled) {
820        future.complete(false);
821      } else {
822        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
823          (locations, error1) -> {
824            if (error1 != null) {
825              future.completeExceptionally(error1);
826              return;
827            }
828            List<HRegionLocation> notDeployedRegions = locations.stream()
829              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
830            if (notDeployedRegions.size() > 0) {
831              if (LOG.isDebugEnabled()) {
832                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
833              }
834              future.complete(false);
835              return;
836            }
837            future.complete(true);
838          });
839      }
840    });
841    return future;
842  }
843
844  @Override
845  public CompletableFuture<Void> addColumnFamily(TableName tableName,
846    ColumnFamilyDescriptor columnFamily) {
847    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
848      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
849        ng.newNonce()),
850      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
851      new AddColumnFamilyProcedureBiConsumer(tableName));
852  }
853
854  @Override
855  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
856    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
857      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
858        ng.newNonce()),
859      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
860      new DeleteColumnFamilyProcedureBiConsumer(tableName));
861  }
862
863  @Override
864  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
865    ColumnFamilyDescriptor columnFamily) {
866    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
867      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
868        ng.newNonce()),
869      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
870      new ModifyColumnFamilyProcedureBiConsumer(tableName));
871  }
872
873  @Override
874  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
875    byte[] family, String dstSFT) {
876    return this.<ModifyColumnStoreFileTrackerRequest,
877      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
878        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
879          ng.getNonceGroup(), ng.newNonce()),
880        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
881        (resp) -> resp.getProcId(),
882        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
883  }
884
885  @Override
886  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
887    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
888      RequestConverter.buildCreateNamespaceRequest(descriptor),
889      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
890      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
891  }
892
893  @Override
894  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
895    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
896      RequestConverter.buildModifyNamespaceRequest(descriptor),
897      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
898      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
899  }
900
901  @Override
902  public CompletableFuture<Void> deleteNamespace(String name) {
903    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
904      RequestConverter.buildDeleteNamespaceRequest(name),
905      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
906      new DeleteNamespaceProcedureBiConsumer(name));
907  }
908
909  @Override
910  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
911    return this.<NamespaceDescriptor> newMasterCaller()
912      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
913        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
914          RequestConverter.buildGetNamespaceDescriptorRequest(name),
915          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
916          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
917      .call();
918  }
919
920  @Override
921  public CompletableFuture<List<String>> listNamespaces() {
922    return this.<List<String>> newMasterCaller()
923      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
924        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
925          (s, c, req, done) -> s.listNamespaces(c, req, done),
926          (resp) -> resp.getNamespaceNameList()))
927      .call();
928  }
929
930  @Override
931  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
932    return this.<List<NamespaceDescriptor>> newMasterCaller()
933      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
934        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
935          ListNamespaceDescriptorsRequest.newBuilder().build(),
936          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
937          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
938      .call();
939  }
940
941  @Override
942  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
943    return this.<List<RegionInfo>> newAdminCaller()
944      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
945        List<RegionInfo>> adminCall(controller, stub,
946          RequestConverter.buildGetOnlineRegionRequest(),
947          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
948          resp -> ProtobufUtil.getRegionInfos(resp)))
949      .serverName(serverName).call();
950  }
951
952  @Override
953  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
954    if (tableName.equals(META_TABLE_NAME)) {
955      return connection.registry.getMetaRegionLocations()
956        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
957          .collect(Collectors.toList()));
958    } else {
959      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
960        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
961    }
962  }
963
964  @Override
965  public CompletableFuture<Void> flush(TableName tableName) {
966    return flush(tableName, Collections.emptyList());
967  }
968
969  @Override
970  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
971    Preconditions.checkNotNull(columnFamily,
972      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
973    return flush(tableName, Collections.singletonList(columnFamily));
974  }
975
976  @Override
977  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
978    // This is for keeping compatibility with old implementation.
979    // If the server version is lower than the client version, it's possible that the
980    // flushTable method is not present in the server side, if so, we need to fall back
981    // to the old implementation.
982    Preconditions.checkNotNull(columnFamilyList,
983      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
984    List<byte[]> columnFamilies = columnFamilyList.stream()
985      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
986    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
987      ng.getNonceGroup(), ng.newNonce());
988    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
989      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
990      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
991    CompletableFuture<Void> future = new CompletableFuture<>();
992    addListener(procFuture, (ret, error) -> {
993      if (error != null) {
994        if (
995          error instanceof TableNotFoundException || error instanceof TableNotEnabledException
996            || error instanceof NoSuchColumnFamilyException
997        ) {
998          future.completeExceptionally(error);
999        } else if (error instanceof DoNotRetryIOException) {
1000          // usually this is caused by the method is not present on the server or
1001          // the hbase hadoop version does not match the running hadoop version.
1002          // if that happens, we need fall back to the old flush implementation.
1003          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
1004          legacyFlush(future, tableName, columnFamilies);
1005        } else {
1006          future.completeExceptionally(error);
1007        }
1008      } else {
1009        future.complete(ret);
1010      }
1011    });
1012    return future;
1013  }
1014
1015  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1016    List<byte[]> columnFamilies) {
1017    addListener(tableExists(tableName), (exists, err) -> {
1018      if (err != null) {
1019        future.completeExceptionally(err);
1020      } else if (!exists) {
1021        future.completeExceptionally(new TableNotFoundException(tableName));
1022      } else {
1023        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1024          if (err2 != null) {
1025            future.completeExceptionally(err2);
1026          } else if (!tableEnabled) {
1027            future.completeExceptionally(new TableNotEnabledException(tableName));
1028          } else {
1029            Map<String, String> props = new HashMap<>();
1030            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1031              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1032                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1033            }
1034            addListener(
1035              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1036              (ret, err3) -> {
1037                if (err3 != null) {
1038                  future.completeExceptionally(err3);
1039                } else {
1040                  future.complete(ret);
1041                }
1042              });
1043          }
1044        });
1045      }
1046    });
1047  }
1048
1049  @Override
1050  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1051    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1052    });
1053  }
1054
1055  @Override
1056  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1057    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1058      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1059    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1060    });
1061  }
1062
1063  /**
1064   * This method is for internal use only, where we need the response of the flush.
1065   * <p/>
1066   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1067   * API.
1068   */
1069  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1070    boolean writeFlushWALMarker) {
1071    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1072    addListener(getRegionLocation(regionName), (location, err) -> {
1073      if (err != null) {
1074        future.completeExceptionally(err);
1075        return;
1076      }
1077      ServerName serverName = location.getServerName();
1078      if (serverName == null) {
1079        future
1080          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1081        return;
1082      }
1083      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1084        (ret, err2) -> {
1085          if (err2 != null) {
1086            future.completeExceptionally(err2);
1087          } else {
1088            future.complete(ret);
1089          }
1090        });
1091    });
1092    return future;
1093  }
1094
1095  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1096    byte[] columnFamily, boolean writeFlushWALMarker) {
1097    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1098      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1099        FlushRegionResponse> adminCall(controller, stub,
1100          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1101            writeFlushWALMarker),
1102          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1103      .call();
1104  }
1105
1106  @Override
1107  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1108    CompletableFuture<Void> future = new CompletableFuture<>();
1109    addListener(getRegions(sn), (hRegionInfos, err) -> {
1110      if (err != null) {
1111        future.completeExceptionally(err);
1112        return;
1113      }
1114      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1115      if (hRegionInfos != null) {
1116        hRegionInfos
1117          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1118          })));
1119      }
1120      addListener(CompletableFuture.allOf(
1121        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1122          if (err2 != null) {
1123            future.completeExceptionally(err2);
1124          } else {
1125            future.complete(ret);
1126          }
1127        });
1128    });
1129    return future;
1130  }
1131
1132  @Override
1133  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1134    return compact(tableName, null, false, compactType);
1135  }
1136
1137  @Override
1138  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1139    CompactType compactType) {
1140    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1141      + "If you don't specify a columnFamily, use compact(TableName) instead");
1142    return compact(tableName, columnFamily, false, compactType);
1143  }
1144
1145  @Override
1146  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1147    return compactRegion(regionName, null, false);
1148  }
1149
1150  @Override
1151  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1152    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1153      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1154    return compactRegion(regionName, columnFamily, false);
1155  }
1156
1157  @Override
1158  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1159    return compact(tableName, null, true, compactType);
1160  }
1161
1162  @Override
1163  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1164    CompactType compactType) {
1165    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1166      + "If you don't specify a columnFamily, use compact(TableName) instead");
1167    return compact(tableName, columnFamily, true, compactType);
1168  }
1169
1170  @Override
1171  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1172    return compactRegion(regionName, null, true);
1173  }
1174
1175  @Override
1176  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1177    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1178      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1179    return compactRegion(regionName, columnFamily, true);
1180  }
1181
1182  @Override
1183  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1184    return compactRegionServer(sn, false);
1185  }
1186
1187  @Override
1188  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1189    return compactRegionServer(sn, true);
1190  }
1191
1192  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1193    CompletableFuture<Void> future = new CompletableFuture<>();
1194    addListener(getRegions(sn), (hRegionInfos, err) -> {
1195      if (err != null) {
1196        future.completeExceptionally(err);
1197        return;
1198      }
1199      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1200      if (hRegionInfos != null) {
1201        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1202      }
1203      addListener(CompletableFuture.allOf(
1204        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1205          if (err2 != null) {
1206            future.completeExceptionally(err2);
1207          } else {
1208            future.complete(ret);
1209          }
1210        });
1211    });
1212    return future;
1213  }
1214
1215  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1216    boolean major) {
1217    CompletableFuture<Void> future = new CompletableFuture<>();
1218    addListener(getRegionLocation(regionName), (location, err) -> {
1219      if (err != null) {
1220        future.completeExceptionally(err);
1221        return;
1222      }
1223      ServerName serverName = location.getServerName();
1224      if (serverName == null) {
1225        future
1226          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1227        return;
1228      }
1229      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1230        (ret, err2) -> {
1231          if (err2 != null) {
1232            future.completeExceptionally(err2);
1233          } else {
1234            future.complete(ret);
1235          }
1236        });
1237    });
1238    return future;
1239  }
1240
1241  /**
1242   * List all region locations for the specific table.
1243   */
1244  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1245    if (TableName.META_TABLE_NAME.equals(tableName)) {
1246      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1247      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1248        if (err != null) {
1249          future.completeExceptionally(err);
1250        } else if (
1251          metaRegions == null || metaRegions.isEmpty()
1252            || metaRegions.getDefaultRegionLocation() == null
1253        ) {
1254          future.completeExceptionally(new IOException("meta region does not found"));
1255        } else {
1256          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1257        }
1258      });
1259      return future;
1260    } else {
1261      // For non-meta table, we fetch all locations by scanning hbase:meta table
1262      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1263    }
1264  }
1265
1266  /**
1267   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1268   */
1269  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1270    CompactType compactType) {
1271    CompletableFuture<Void> future = new CompletableFuture<>();
1272
1273    switch (compactType) {
1274      case MOB:
1275        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1276          if (err != null) {
1277            future.completeExceptionally(err);
1278            return;
1279          }
1280          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1281          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1282            if (err2 != null) {
1283              future.completeExceptionally(err2);
1284            } else {
1285              future.complete(ret);
1286            }
1287          });
1288        });
1289        break;
1290      case NORMAL:
1291        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1292          if (err != null) {
1293            future.completeExceptionally(err);
1294            return;
1295          }
1296          if (locations == null || locations.isEmpty()) {
1297            future.completeExceptionally(new TableNotFoundException(tableName));
1298          }
1299          CompletableFuture<?>[] compactFutures =
1300            locations.stream().filter(l -> l.getRegion() != null)
1301              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1302              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1303              .toArray(CompletableFuture<?>[]::new);
1304          // future complete unless all of the compact futures are completed.
1305          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1306            if (err2 != null) {
1307              future.completeExceptionally(err2);
1308            } else {
1309              future.complete(ret);
1310            }
1311          });
1312        });
1313        break;
1314      default:
1315        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1316    }
1317    return future;
1318  }
1319
1320  /**
1321   * Compact the region at specific region server.
1322   */
1323  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1324    final boolean major, byte[] columnFamily) {
1325    return this.<Void> newAdminCaller().serverName(sn)
1326      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1327        Void> adminCall(controller, stub,
1328          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1329          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1330      .call();
1331  }
1332
1333  private byte[] toEncodeRegionName(byte[] regionName) {
1334    return RegionInfo.isEncodedRegionName(regionName)
1335      ? regionName
1336      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1337  }
1338
1339  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1340    CompletableFuture<TableName> result) {
1341    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1342      if (err != null) {
1343        result.completeExceptionally(err);
1344        return;
1345      }
1346      RegionInfo regionInfo = location.getRegion();
1347      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1348        result.completeExceptionally(
1349          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1350        return;
1351      }
1352      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1353        if (!tableName.get().equals(regionInfo.getTable())) {
1354          // tables of this two region should be same.
1355          result.completeExceptionally(
1356            new IllegalArgumentException("Cannot merge regions from two different tables "
1357              + tableName.get() + " and " + regionInfo.getTable()));
1358        } else {
1359          result.complete(tableName.get());
1360        }
1361      }
1362    });
1363  }
1364
1365  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1366    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1367    CompletableFuture<TableName> future = new CompletableFuture<>();
1368    for (byte[] encodedRegionName : encodedRegionNames) {
1369      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1370    }
1371    return future;
1372  }
1373
1374  @Override
1375  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1376    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1377  }
1378
1379  @Override
1380  public CompletableFuture<Boolean> isMergeEnabled() {
1381    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1382  }
1383
1384  @Override
1385  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1386    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1387  }
1388
1389  @Override
1390  public CompletableFuture<Boolean> isSplitEnabled() {
1391    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1392  }
1393
1394  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1395    MasterSwitchType switchType) {
1396    SetSplitOrMergeEnabledRequest request =
1397      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1398    return this.<Boolean> newMasterCaller()
1399      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1400        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1401          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1402          (resp) -> resp.getPrevValueList().get(0)))
1403      .call();
1404  }
1405
1406  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1407    IsSplitOrMergeEnabledRequest request =
1408      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1409    return this.<Boolean> newMasterCaller()
1410      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1411        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1412          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1413      .call();
1414  }
1415
1416  @Override
1417  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1418    if (nameOfRegionsToMerge.size() < 2) {
1419      return failedFuture(new IllegalArgumentException(
1420        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1421    }
1422    CompletableFuture<Void> future = new CompletableFuture<>();
1423    byte[][] encodedNameOfRegionsToMerge =
1424      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1425
1426    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1427      if (err != null) {
1428        future.completeExceptionally(err);
1429        return;
1430      }
1431
1432      final MergeTableRegionsRequest request;
1433      try {
1434        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1435          forcible, ng.getNonceGroup(), ng.newNonce());
1436      } catch (DeserializationException e) {
1437        future.completeExceptionally(e);
1438        return;
1439      }
1440
1441      addListener(
1442        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1443          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1444        (ret, err2) -> {
1445          if (err2 != null) {
1446            future.completeExceptionally(err2);
1447          } else {
1448            future.complete(ret);
1449          }
1450        });
1451    });
1452    return future;
1453  }
1454
1455  @Override
1456  public CompletableFuture<Void> split(TableName tableName) {
1457    CompletableFuture<Void> future = new CompletableFuture<>();
1458    addListener(tableExists(tableName), (exist, error) -> {
1459      if (error != null) {
1460        future.completeExceptionally(error);
1461        return;
1462      }
1463      if (!exist) {
1464        future.completeExceptionally(new TableNotFoundException(tableName));
1465        return;
1466      }
1467      addListener(metaTable
1468        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1469          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1470            ClientMetaTableAccessor.QueryType.REGION))
1471          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1472            ClientMetaTableAccessor.QueryType.REGION))),
1473        (results, err2) -> {
1474          if (err2 != null) {
1475            future.completeExceptionally(err2);
1476            return;
1477          }
1478          if (results != null && !results.isEmpty()) {
1479            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1480            for (Result r : results) {
1481              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1482                continue;
1483              }
1484              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1485              if (rl != null) {
1486                for (HRegionLocation h : rl.getRegionLocations()) {
1487                  if (h != null && h.getServerName() != null) {
1488                    RegionInfo hri = h.getRegion();
1489                    if (
1490                      hri == null || hri.isSplitParent()
1491                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1492                    ) {
1493                      continue;
1494                    }
1495                    splitFutures.add(split(hri, null));
1496                  }
1497                }
1498              }
1499            }
1500            addListener(
1501              CompletableFuture
1502                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1503              (ret, exception) -> {
1504                if (exception != null) {
1505                  future.completeExceptionally(exception);
1506                  return;
1507                }
1508                future.complete(ret);
1509              });
1510          } else {
1511            future.complete(null);
1512          }
1513        });
1514    });
1515    return future;
1516  }
1517
1518  @Override
1519  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1520    CompletableFuture<Void> result = new CompletableFuture<>();
1521    if (splitPoint == null) {
1522      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1523    }
1524    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1525      (loc, err) -> {
1526        if (err != null) {
1527          result.completeExceptionally(err);
1528        } else if (loc == null || loc.getRegion() == null) {
1529          result.completeExceptionally(new IllegalArgumentException(
1530            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1531        } else {
1532          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1533            if (err2 != null) {
1534              result.completeExceptionally(err2);
1535            } else {
1536              result.complete(ret);
1537            }
1538
1539          });
1540        }
1541      });
1542    return result;
1543  }
1544
1545  @Override
1546  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1547    CompletableFuture<Void> future = new CompletableFuture<>();
1548    addListener(getRegionLocation(regionName), (location, err) -> {
1549      if (err != null) {
1550        future.completeExceptionally(err);
1551        return;
1552      }
1553      RegionInfo regionInfo = location.getRegion();
1554      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1555        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1556          + "Replicas are auto-split when their primary is split."));
1557        return;
1558      }
1559      ServerName serverName = location.getServerName();
1560      if (serverName == null) {
1561        future
1562          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1563        return;
1564      }
1565      addListener(split(regionInfo, null), (ret, err2) -> {
1566        if (err2 != null) {
1567          future.completeExceptionally(err2);
1568        } else {
1569          future.complete(ret);
1570        }
1571      });
1572    });
1573    return future;
1574  }
1575
1576  @Override
1577  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1578    Preconditions.checkNotNull(splitPoint,
1579      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1580    CompletableFuture<Void> future = new CompletableFuture<>();
1581    addListener(getRegionLocation(regionName), (location, err) -> {
1582      if (err != null) {
1583        future.completeExceptionally(err);
1584        return;
1585      }
1586      RegionInfo regionInfo = location.getRegion();
1587      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1588        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1589          + "Replicas are auto-split when their primary is split."));
1590        return;
1591      }
1592      ServerName serverName = location.getServerName();
1593      if (serverName == null) {
1594        future
1595          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1596        return;
1597      }
1598      if (
1599        regionInfo.getStartKey() != null
1600          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1601      ) {
1602        future.completeExceptionally(
1603          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1604        return;
1605      }
1606      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1607        if (err2 != null) {
1608          future.completeExceptionally(err2);
1609        } else {
1610          future.complete(ret);
1611        }
1612      });
1613    });
1614    return future;
1615  }
1616
1617  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1618    CompletableFuture<Void> future = new CompletableFuture<>();
1619    TableName tableName = hri.getTable();
1620    final SplitTableRegionRequest request;
1621    try {
1622      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1623        ng.newNonce());
1624    } catch (DeserializationException e) {
1625      future.completeExceptionally(e);
1626      return future;
1627    }
1628
1629    addListener(
1630      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1631        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1632      (ret, err2) -> {
1633        if (err2 != null) {
1634          future.completeExceptionally(err2);
1635        } else {
1636          future.complete(ret);
1637        }
1638      });
1639    return future;
1640  }
1641
1642  @Override
1643  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1644    CompletableFuture<Void> future = new CompletableFuture<>();
1645    addListener(getRegionLocation(regionName), (location, err) -> {
1646      if (err != null) {
1647        future.completeExceptionally(err);
1648        return;
1649      }
1650      RegionInfo regionInfo = location.getRegion();
1651      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1652        future.completeExceptionally(new IllegalArgumentException(
1653          "Can't truncate replicas directly.Replicas are auto-truncated "
1654            + "when their primary is truncated."));
1655        return;
1656      }
1657      ServerName serverName = location.getServerName();
1658      if (serverName == null) {
1659        future
1660          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1661        return;
1662      }
1663      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1664        if (err2 != null) {
1665          future.completeExceptionally(err2);
1666        } else {
1667          future.complete(ret);
1668        }
1669      });
1670    });
1671    return future;
1672  }
1673
1674  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1675    CompletableFuture<Void> future = new CompletableFuture<>();
1676    TableName tableName = hri.getTable();
1677    final MasterProtos.TruncateRegionRequest request;
1678    try {
1679      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1680    } catch (DeserializationException e) {
1681      future.completeExceptionally(e);
1682      return future;
1683    }
1684    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1685      MasterProtos.TruncateRegionResponse::getProcId,
1686      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1687        if (err2 != null) {
1688          future.completeExceptionally(err2);
1689        } else {
1690          future.complete(ret);
1691        }
1692      });
1693    return future;
1694  }
1695
1696  @Override
1697  public CompletableFuture<Void> assign(byte[] regionName) {
1698    CompletableFuture<Void> future = new CompletableFuture<>();
1699    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1700      if (err != null) {
1701        future.completeExceptionally(err);
1702        return;
1703      }
1704      addListener(
1705        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1706          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1707            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1708            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1709          .call(),
1710        (ret, err2) -> {
1711          if (err2 != null) {
1712            future.completeExceptionally(err2);
1713          } else {
1714            future.complete(ret);
1715          }
1716        });
1717    });
1718    return future;
1719  }
1720
1721  @Override
1722  public CompletableFuture<Void> unassign(byte[] regionName) {
1723    CompletableFuture<Void> future = new CompletableFuture<>();
1724    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1725      if (err != null) {
1726        future.completeExceptionally(err);
1727        return;
1728      }
1729      addListener(
1730        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1731          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1732            Void> call(controller, stub,
1733              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1734              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1735          .call(),
1736        (ret, err2) -> {
1737          if (err2 != null) {
1738            future.completeExceptionally(err2);
1739          } else {
1740            future.complete(ret);
1741          }
1742        });
1743    });
1744    return future;
1745  }
1746
1747  @Override
1748  public CompletableFuture<Void> offline(byte[] regionName) {
1749    CompletableFuture<Void> future = new CompletableFuture<>();
1750    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1751      if (err != null) {
1752        future.completeExceptionally(err);
1753        return;
1754      }
1755      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1756        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1757          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1758          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1759        .call(), (ret, err2) -> {
1760          if (err2 != null) {
1761            future.completeExceptionally(err2);
1762          } else {
1763            future.complete(ret);
1764          }
1765        });
1766    });
1767    return future;
1768  }
1769
1770  @Override
1771  public CompletableFuture<Void> move(byte[] regionName) {
1772    CompletableFuture<Void> future = new CompletableFuture<>();
1773    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1774      if (err != null) {
1775        future.completeExceptionally(err);
1776        return;
1777      }
1778      addListener(
1779        moveRegion(regionInfo,
1780          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1781        (ret, err2) -> {
1782          if (err2 != null) {
1783            future.completeExceptionally(err2);
1784          } else {
1785            future.complete(ret);
1786          }
1787        });
1788    });
1789    return future;
1790  }
1791
1792  @Override
1793  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1794    Preconditions.checkNotNull(destServerName,
1795      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1796    CompletableFuture<Void> future = new CompletableFuture<>();
1797    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1798      if (err != null) {
1799        future.completeExceptionally(err);
1800        return;
1801      }
1802      addListener(
1803        moveRegion(regionInfo, RequestConverter
1804          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1805        (ret, err2) -> {
1806          if (err2 != null) {
1807            future.completeExceptionally(err2);
1808          } else {
1809            future.complete(ret);
1810          }
1811        });
1812    });
1813    return future;
1814  }
1815
1816  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1817    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1818      .action(
1819        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1820          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1821      .call();
1822  }
1823
1824  @Override
1825  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1826    return this.<Void> newMasterCaller()
1827      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1828        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1829        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1830      .call();
1831  }
1832
1833  @Override
1834  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1835    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1836    Scan scan = QuotaTableUtil.makeScan(filter);
1837    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1838      new AdvancedScanResultConsumer() {
1839        List<QuotaSettings> settings = new ArrayList<>();
1840
1841        @Override
1842        public void onNext(Result[] results, ScanController controller) {
1843          for (Result result : results) {
1844            try {
1845              QuotaTableUtil.parseResultToCollection(result, settings);
1846            } catch (IOException e) {
1847              controller.terminate();
1848              future.completeExceptionally(e);
1849            }
1850          }
1851        }
1852
1853        @Override
1854        public void onError(Throwable error) {
1855          future.completeExceptionally(error);
1856        }
1857
1858        @Override
1859        public void onComplete() {
1860          future.complete(settings);
1861        }
1862      });
1863    return future;
1864  }
1865
1866  @Override
1867  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1868    boolean enabled) {
1869    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1870      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1871      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1872      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1873  }
1874
1875  @Override
1876  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1877    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1878      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1879      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1880      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1881  }
1882
1883  @Override
1884  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1885    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1886      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1887      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1888      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1889  }
1890
1891  @Override
1892  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1893    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1894      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1895      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1896      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1897  }
1898
1899  @Override
1900  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1901    return this.<ReplicationPeerConfig> newMasterCaller()
1902      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1903        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1904          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1905          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1906          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1907      .call();
1908  }
1909
1910  @Override
1911  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1912    ReplicationPeerConfig peerConfig) {
1913    return this.<UpdateReplicationPeerConfigRequest,
1914      UpdateReplicationPeerConfigResponse> procedureCall(
1915        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1916        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1917        (resp) -> resp.getProcId(),
1918        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1919  }
1920
1921  @Override
1922  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1923    SyncReplicationState clusterState) {
1924    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1925      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1926        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1927          clusterState),
1928        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1929        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1930          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1931  }
1932
1933  @Override
1934  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1935    Map<TableName, List<String>> tableCfs) {
1936    if (tableCfs == null) {
1937      return failedFuture(new ReplicationException("tableCfs is null"));
1938    }
1939
1940    CompletableFuture<Void> future = new CompletableFuture<Void>();
1941    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1942      if (!completeExceptionally(future, error)) {
1943        ReplicationPeerConfig newPeerConfig =
1944          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1945        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1946          if (!completeExceptionally(future, error)) {
1947            future.complete(result);
1948          }
1949        });
1950      }
1951    });
1952    return future;
1953  }
1954
1955  @Override
1956  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1957    Map<TableName, List<String>> tableCfs) {
1958    if (tableCfs == null) {
1959      return failedFuture(new ReplicationException("tableCfs is null"));
1960    }
1961
1962    CompletableFuture<Void> future = new CompletableFuture<Void>();
1963    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1964      if (!completeExceptionally(future, error)) {
1965        ReplicationPeerConfig newPeerConfig = null;
1966        try {
1967          newPeerConfig = ReplicationPeerConfigUtil
1968            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1969        } catch (ReplicationException e) {
1970          future.completeExceptionally(e);
1971          return;
1972        }
1973        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1974          if (!completeExceptionally(future, error)) {
1975            future.complete(result);
1976          }
1977        });
1978      }
1979    });
1980    return future;
1981  }
1982
1983  @Override
1984  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1985    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1986  }
1987
1988  @Override
1989  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1990    Preconditions.checkNotNull(pattern,
1991      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1992    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1993  }
1994
1995  private CompletableFuture<List<ReplicationPeerDescription>>
1996    listReplicationPeers(ListReplicationPeersRequest request) {
1997    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1998      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1999        List<ReplicationPeerDescription>> call(controller, stub, request,
2000          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
2001          (resp) -> resp.getPeerDescList().stream()
2002            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
2003            .collect(Collectors.toList())))
2004      .call();
2005  }
2006
2007  @Override
2008  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2009    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2010    addListener(listTableDescriptors(), (tables, error) -> {
2011      if (!completeExceptionally(future, error)) {
2012        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2013        tables.forEach(table -> {
2014          Map<String, Integer> cfs = new HashMap<>();
2015          Stream.of(table.getColumnFamilies())
2016            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2017            .forEach(column -> {
2018              cfs.put(column.getNameAsString(), column.getScope());
2019            });
2020          if (!cfs.isEmpty()) {
2021            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2022          }
2023        });
2024        future.complete(replicatedTableCFs);
2025      }
2026    });
2027    return future;
2028  }
2029
2030  @Override
2031  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2032    SnapshotProtos.SnapshotDescription snapshot =
2033      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2034    try {
2035      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2036    } catch (IllegalArgumentException e) {
2037      return failedFuture(e);
2038    }
2039    CompletableFuture<Void> future = new CompletableFuture<>();
2040    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2041      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2042    addListener(this.<SnapshotResponse> newMasterCaller()
2043      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2044        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2045      .call(), (resp, err) -> {
2046        if (err != null) {
2047          future.completeExceptionally(err);
2048          return;
2049        }
2050        waitSnapshotFinish(snapshotDesc, future, resp);
2051      });
2052    return future;
2053  }
2054
2055  // This is for keeping compatibility with old implementation.
2056  // If there is a procId field in the response, then the snapshot will be operated with a
2057  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2058  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2059    SnapshotResponse resp) {
2060    if (resp.hasProcId()) {
2061      getProcedureResult(resp.getProcId(), future, 0);
2062      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2063    } else {
2064      long expectedTimeout = resp.getExpectedTimeout();
2065      TimerTask pollingTask = new TimerTask() {
2066        int tries = 0;
2067        long startTime = EnvironmentEdgeManager.currentTime();
2068        long endTime = startTime + expectedTimeout;
2069        long maxPauseTime = expectedTimeout / maxAttempts;
2070
2071        @Override
2072        public void run(Timeout timeout) throws Exception {
2073          if (EnvironmentEdgeManager.currentTime() < endTime) {
2074            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2075              if (err2 != null) {
2076                future.completeExceptionally(err2);
2077              } else if (done) {
2078                future.complete(null);
2079              } else {
2080                // retry again after pauseTime.
2081                long pauseTime =
2082                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2083                pauseTime = Math.min(pauseTime, maxPauseTime);
2084                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2085              }
2086            });
2087          } else {
2088            future
2089              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2090                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2091          }
2092        }
2093      };
2094      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2095    }
2096  }
2097
2098  @Override
2099  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2100    return this.<Boolean> newMasterCaller()
2101      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2102        Boolean> call(controller, stub,
2103          IsSnapshotDoneRequest.newBuilder()
2104            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2105          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2106      .call();
2107  }
2108
2109  @Override
2110  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2111    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2112      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2113      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2114    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2115  }
2116
2117  @Override
2118  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2119    boolean restoreAcl) {
2120    CompletableFuture<Void> future = new CompletableFuture<>();
2121    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2122      if (err != null) {
2123        future.completeExceptionally(err);
2124        return;
2125      }
2126      TableName tableName = null;
2127      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2128        for (SnapshotDescription snap : snapshotDescriptions) {
2129          if (snap.getName().equals(snapshotName)) {
2130            tableName = snap.getTableName();
2131            break;
2132          }
2133        }
2134      }
2135      if (tableName == null) {
2136        future.completeExceptionally(
2137          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2138        return;
2139      }
2140      final TableName finalTableName = tableName;
2141      addListener(tableExists(finalTableName), (exists, err2) -> {
2142        if (err2 != null) {
2143          future.completeExceptionally(err2);
2144        } else if (!exists) {
2145          // if table does not exist, then just clone snapshot into new table.
2146          completeConditionalOnFuture(future,
2147            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2148        } else {
2149          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2150            if (err4 != null) {
2151              future.completeExceptionally(err4);
2152            } else if (!disabled) {
2153              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2154            } else {
2155              completeConditionalOnFuture(future,
2156                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2157            }
2158          });
2159        }
2160      });
2161    });
2162    return future;
2163  }
2164
2165  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2166    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2167    if (takeFailSafeSnapshot) {
2168      CompletableFuture<Void> future = new CompletableFuture<>();
2169      // Step.1 Take a snapshot of the current state
2170      String failSafeSnapshotSnapshotNameFormat =
2171        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2172          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2173      final String failSafeSnapshotSnapshotName =
2174        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2175          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2176          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2177      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2178      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2179        if (err != null) {
2180          future.completeExceptionally(err);
2181        } else {
2182          // Step.2 Restore snapshot
2183          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2184            (void2, err2) -> {
2185              if (err2 != null) {
2186                // Step.3.a Something went wrong during the restore and try to rollback.
2187                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2188                  restoreAcl, null), (void3, err3) -> {
2189                    if (err3 != null) {
2190                      future.completeExceptionally(err3);
2191                    } else {
2192                      // If fail to restore snapshot but rollback successfully, delete the
2193                      // restore-failsafe snapshot.
2194                      LOG.info(
2195                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2196                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2197                        if (err4 != null) {
2198                          LOG.error("Unable to remove the failsafe snapshot: {}",
2199                            failSafeSnapshotSnapshotName, err4);
2200                        }
2201                        String msg =
2202                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2203                            + failSafeSnapshotSnapshotName + " succeeded.";
2204                        LOG.error(msg);
2205                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2206                      });
2207                    }
2208                  });
2209              } else {
2210                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2211                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2212                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2213                  if (err3 != null) {
2214                    LOG.error(
2215                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2216                      err3);
2217                  }
2218                  future.complete(ret3);
2219                });
2220              }
2221            });
2222        }
2223      });
2224      return future;
2225    } else {
2226      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2227    }
2228  }
2229
2230  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2231    CompletableFuture<T> parentFuture) {
2232    addListener(parentFuture, (res, err) -> {
2233      if (err != null) {
2234        dependentFuture.completeExceptionally(err);
2235      } else {
2236        dependentFuture.complete(res);
2237      }
2238    });
2239  }
2240
2241  @Override
2242  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2243    boolean restoreAcl, String customSFT) {
2244    CompletableFuture<Void> future = new CompletableFuture<>();
2245    addListener(tableExists(tableName), (exists, err) -> {
2246      if (err != null) {
2247        future.completeExceptionally(err);
2248      } else if (exists) {
2249        future.completeExceptionally(new TableExistsException(tableName));
2250      } else {
2251        completeConditionalOnFuture(future,
2252          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2253      }
2254    });
2255    return future;
2256  }
2257
2258  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2259    boolean restoreAcl, String customSFT) {
2260    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2261      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2262    try {
2263      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2264    } catch (IllegalArgumentException e) {
2265      return failedFuture(e);
2266    }
2267    RestoreSnapshotRequest.Builder builder =
2268      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2269        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2270    if (customSFT != null) {
2271      builder.setCustomSFT(customSFT);
2272    }
2273    return waitProcedureResult(this.<Long> newMasterCaller()
2274      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2275        Long> call(controller, stub, builder.build(),
2276          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2277      .call());
2278  }
2279
2280  @Override
2281  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2282    return getCompletedSnapshots(null);
2283  }
2284
2285  @Override
2286  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2287    Preconditions.checkNotNull(pattern,
2288      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2289    return getCompletedSnapshots(pattern);
2290  }
2291
2292  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2293    return this.<List<SnapshotDescription>> newMasterCaller()
2294      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2295        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2296          GetCompletedSnapshotsRequest.newBuilder().build(),
2297          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2298          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2299      .call();
2300  }
2301
2302  @Override
2303  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2304    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2305      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2306    return getCompletedSnapshots(tableNamePattern, null);
2307  }
2308
2309  @Override
2310  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2311    Pattern snapshotNamePattern) {
2312    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2313      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2314    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2315      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2316    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2317  }
2318
2319  private CompletableFuture<List<SnapshotDescription>>
2320    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2321    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2322    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2323      if (err != null) {
2324        future.completeExceptionally(err);
2325        return;
2326      }
2327      if (tableNames == null || tableNames.size() <= 0) {
2328        future.complete(Collections.emptyList());
2329        return;
2330      }
2331      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2332        if (err2 != null) {
2333          future.completeExceptionally(err2);
2334          return;
2335        }
2336        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2337          future.complete(Collections.emptyList());
2338          return;
2339        }
2340        future.complete(snapshotDescList.stream()
2341          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2342          .collect(Collectors.toList()));
2343      });
2344    });
2345    return future;
2346  }
2347
2348  @Override
2349  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2350    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2351  }
2352
2353  @Override
2354  public CompletableFuture<Void> deleteSnapshots() {
2355    return internalDeleteSnapshots(null, null);
2356  }
2357
2358  @Override
2359  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2360    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2361      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2362    return internalDeleteSnapshots(null, snapshotNamePattern);
2363  }
2364
2365  @Override
2366  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2367    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2368      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2369    return internalDeleteSnapshots(tableNamePattern, null);
2370  }
2371
2372  @Override
2373  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2374    Pattern snapshotNamePattern) {
2375    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2376      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2377    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2378      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2379    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2380  }
2381
2382  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2383    Pattern snapshotNamePattern) {
2384    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2385    if (tableNamePattern == null) {
2386      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2387    } else {
2388      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2389    }
2390    CompletableFuture<Void> future = new CompletableFuture<>();
2391    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2392      if (err != null) {
2393        future.completeExceptionally(err);
2394        return;
2395      }
2396      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2397        future.complete(null);
2398        return;
2399      }
2400      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2401        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2402          if (e != null) {
2403            future.completeExceptionally(e);
2404          } else {
2405            future.complete(v);
2406          }
2407        });
2408    });
2409    return future;
2410  }
2411
2412  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2413    return this.<Void> newMasterCaller()
2414      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2415        controller, stub,
2416        DeleteSnapshotRequest.newBuilder()
2417          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2418        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2419      .call();
2420  }
2421
2422  @Override
2423  public CompletableFuture<Void> execProcedure(String signature, String instance,
2424    Map<String, String> props) {
2425    CompletableFuture<Void> future = new CompletableFuture<>();
2426    ProcedureDescription procDesc =
2427      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2428    addListener(this.<Long> newMasterCaller()
2429      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2430        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2431        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2432      .call(), (expectedTimeout, err) -> {
2433        if (err != null) {
2434          future.completeExceptionally(err);
2435          return;
2436        }
2437        TimerTask pollingTask = new TimerTask() {
2438          int tries = 0;
2439          long startTime = EnvironmentEdgeManager.currentTime();
2440          long endTime = startTime + expectedTimeout;
2441          long maxPauseTime = expectedTimeout / maxAttempts;
2442
2443          @Override
2444          public void run(Timeout timeout) throws Exception {
2445            if (EnvironmentEdgeManager.currentTime() < endTime) {
2446              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2447                if (err2 != null) {
2448                  future.completeExceptionally(err2);
2449                  return;
2450                }
2451                if (done) {
2452                  future.complete(null);
2453                } else {
2454                  // retry again after pauseTime.
2455                  long pauseTime =
2456                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2457                  pauseTime = Math.min(pauseTime, maxPauseTime);
2458                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2459                    TimeUnit.MICROSECONDS);
2460                }
2461              });
2462            } else {
2463              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2464                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2465            }
2466          }
2467        };
2468        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2469        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2470      });
2471    return future;
2472  }
2473
2474  @Override
2475  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2476    Map<String, String> props) {
2477    ProcedureDescription proDesc =
2478      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2479    return this.<byte[]> newMasterCaller()
2480      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2481        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2482        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2483        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2484      .call();
2485  }
2486
2487  @Override
2488  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2489    Map<String, String> props) {
2490    ProcedureDescription proDesc =
2491      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2492    return this.<Boolean> newMasterCaller()
2493      .action(
2494        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2495          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2496          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2497      .call();
2498  }
2499
2500  @Override
2501  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2502    return this.<Boolean> newMasterCaller().action(
2503      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2504        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2505        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2506      .call();
2507  }
2508
2509  @Override
2510  public CompletableFuture<String> getProcedures() {
2511    return this.<String> newMasterCaller()
2512      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2513        controller, stub, GetProceduresRequest.newBuilder().build(),
2514        (s, c, req, done) -> s.getProcedures(c, req, done),
2515        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2516      .call();
2517  }
2518
2519  @Override
2520  public CompletableFuture<String> getLocks() {
2521    return this.<String> newMasterCaller()
2522      .action(
2523        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2524          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2525          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2526      .call();
2527  }
2528
2529  @Override
2530  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2531    boolean offload) {
2532    return this.<Void> newMasterCaller()
2533      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2534        DecommissionRegionServersResponse, Void> call(controller, stub,
2535          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2536          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2537      .call();
2538  }
2539
2540  @Override
2541  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2542    return this.<List<ServerName>> newMasterCaller()
2543      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2544        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2545          ListDecommissionedRegionServersRequest.newBuilder().build(),
2546          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2547          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2548            .collect(Collectors.toList())))
2549      .call();
2550  }
2551
2552  @Override
2553  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2554    List<byte[]> encodedRegionNames) {
2555    return this.<Void> newMasterCaller()
2556      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2557        RecommissionRegionServerResponse, Void> call(controller, stub,
2558          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2559          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2560      .call();
2561  }
2562
2563  /**
2564   * Get the region location for the passed region name. The region name may be a full region name
2565   * or encoded region name. If the region does not found, then it'll throw an
2566   * UnknownRegionException wrapped by a {@link CompletableFuture}
2567   * @param regionNameOrEncodedRegionName region name or encoded region name
2568   * @return region location, wrapped by a {@link CompletableFuture}
2569   */
2570  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2571    if (regionNameOrEncodedRegionName == null) {
2572      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2573    }
2574
2575    CompletableFuture<Optional<HRegionLocation>> future;
2576    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2577      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2578      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2579        // old format encodedName, should be meta region
2580        future = connection.registry.getMetaRegionLocations()
2581          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2582            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2583      } else {
2584        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2585          regionNameOrEncodedRegionName);
2586      }
2587    } else {
2588      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2589      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2590      RegionInfo regionInfo;
2591      try {
2592        regionInfo =
2593          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2594      } catch (IOException ioe) {
2595        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2596      }
2597
2598      if (regionInfo.isMetaRegion()) {
2599        future = connection.registry.getMetaRegionLocations()
2600          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2601            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2602            .findFirst());
2603      } else {
2604        future =
2605          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2606      }
2607    }
2608
2609    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2610    addListener(future, (location, err) -> {
2611      if (err != null) {
2612        returnedFuture.completeExceptionally(err);
2613        return;
2614      }
2615      if (!location.isPresent() || location.get().getRegion() == null) {
2616        returnedFuture.completeExceptionally(
2617          new UnknownRegionException("Invalid region name or encoded region name: "
2618            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2619      } else {
2620        returnedFuture.complete(location.get());
2621      }
2622    });
2623    return returnedFuture;
2624  }
2625
2626  /**
2627   * Get the region info for the passed region name. The region name may be a full region name or
2628   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2629   * wrapped by a {@link CompletableFuture}
2630   * @return region info, wrapped by a {@link CompletableFuture}
2631   */
2632  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2633    if (regionNameOrEncodedRegionName == null) {
2634      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2635    }
2636
2637    if (
2638      Bytes.equals(regionNameOrEncodedRegionName,
2639        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2640        || Bytes.equals(regionNameOrEncodedRegionName,
2641          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2642    ) {
2643      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2644    }
2645
2646    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2647    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2648      if (err != null) {
2649        future.completeExceptionally(err);
2650      } else {
2651        future.complete(location.getRegion());
2652      }
2653    });
2654    return future;
2655  }
2656
2657  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2658    if (numRegions < 3) {
2659      throw new IllegalArgumentException("Must create at least three regions");
2660    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2661      throw new IllegalArgumentException("Start key must be smaller than end key");
2662    }
2663    if (numRegions == 3) {
2664      return new byte[][] { startKey, endKey };
2665    }
2666    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2667    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2668      throw new IllegalArgumentException("Unable to split key range into enough regions");
2669    }
2670    return splitKeys;
2671  }
2672
2673  private void verifySplitKeys(byte[][] splitKeys) {
2674    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2675    // Verify there are no duplicate split keys
2676    byte[] lastKey = null;
2677    for (byte[] splitKey : splitKeys) {
2678      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2679        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2680      }
2681      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2682        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2683          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2684      }
2685      lastKey = splitKey;
2686    }
2687  }
2688
2689  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2690
2691    abstract void onFinished();
2692
2693    abstract void onError(Throwable error);
2694
2695    @Override
2696    public void accept(Void v, Throwable error) {
2697      if (error != null) {
2698        onError(error);
2699        return;
2700      }
2701      onFinished();
2702    }
2703  }
2704
2705  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2706    protected final TableName tableName;
2707
2708    TableProcedureBiConsumer(TableName tableName) {
2709      this.tableName = tableName;
2710    }
2711
2712    abstract String getOperationType();
2713
2714    String getDescription() {
2715      return "Operation: " + getOperationType() + ", " + "Table Name: "
2716        + tableName.getNameWithNamespaceInclAsString();
2717    }
2718
2719    @Override
2720    void onFinished() {
2721      LOG.info(getDescription() + " completed");
2722    }
2723
2724    @Override
2725    void onError(Throwable error) {
2726      LOG.info(getDescription() + " failed with " + error.getMessage());
2727    }
2728  }
2729
2730  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2731    protected final String namespaceName;
2732
2733    NamespaceProcedureBiConsumer(String namespaceName) {
2734      this.namespaceName = namespaceName;
2735    }
2736
2737    abstract String getOperationType();
2738
2739    String getDescription() {
2740      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2741    }
2742
2743    @Override
2744    void onFinished() {
2745      LOG.info(getDescription() + " completed");
2746    }
2747
2748    @Override
2749    void onError(Throwable error) {
2750      LOG.info(getDescription() + " failed with " + error.getMessage());
2751    }
2752  }
2753
2754  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2755
2756    CreateTableProcedureBiConsumer(TableName tableName) {
2757      super(tableName);
2758    }
2759
2760    @Override
2761    String getOperationType() {
2762      return "CREATE";
2763    }
2764  }
2765
2766  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2767
2768    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2769      super(tableName);
2770    }
2771
2772    @Override
2773    String getOperationType() {
2774      return "MODIFY";
2775    }
2776  }
2777
2778  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2779    extends TableProcedureBiConsumer {
2780
2781    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2782      super(tableName);
2783    }
2784
2785    @Override
2786    String getOperationType() {
2787      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2788    }
2789  }
2790
2791  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2792
2793    DeleteTableProcedureBiConsumer(TableName tableName) {
2794      super(tableName);
2795    }
2796
2797    @Override
2798    String getOperationType() {
2799      return "DELETE";
2800    }
2801
2802    @Override
2803    void onFinished() {
2804      connection.getLocator().clearCache(this.tableName);
2805      super.onFinished();
2806    }
2807  }
2808
2809  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2810
2811    TruncateTableProcedureBiConsumer(TableName tableName) {
2812      super(tableName);
2813    }
2814
2815    @Override
2816    String getOperationType() {
2817      return "TRUNCATE";
2818    }
2819  }
2820
2821  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2822
2823    EnableTableProcedureBiConsumer(TableName tableName) {
2824      super(tableName);
2825    }
2826
2827    @Override
2828    String getOperationType() {
2829      return "ENABLE";
2830    }
2831  }
2832
2833  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2834
2835    DisableTableProcedureBiConsumer(TableName tableName) {
2836      super(tableName);
2837    }
2838
2839    @Override
2840    String getOperationType() {
2841      return "DISABLE";
2842    }
2843  }
2844
2845  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2846
2847    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2848      super(tableName);
2849    }
2850
2851    @Override
2852    String getOperationType() {
2853      return "ADD_COLUMN_FAMILY";
2854    }
2855  }
2856
2857  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2858
2859    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2860      super(tableName);
2861    }
2862
2863    @Override
2864    String getOperationType() {
2865      return "DELETE_COLUMN_FAMILY";
2866    }
2867  }
2868
2869  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2870
2871    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2872      super(tableName);
2873    }
2874
2875    @Override
2876    String getOperationType() {
2877      return "MODIFY_COLUMN_FAMILY";
2878    }
2879  }
2880
2881  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2882    extends TableProcedureBiConsumer {
2883
2884    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2885      super(tableName);
2886    }
2887
2888    @Override
2889    String getOperationType() {
2890      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2891    }
2892  }
2893
2894  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2895
2896    FlushTableProcedureBiConsumer(TableName tableName) {
2897      super(tableName);
2898    }
2899
2900    @Override
2901    String getOperationType() {
2902      return "FLUSH";
2903    }
2904  }
2905
2906  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2907
2908    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2909      super(namespaceName);
2910    }
2911
2912    @Override
2913    String getOperationType() {
2914      return "CREATE_NAMESPACE";
2915    }
2916  }
2917
2918  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2919
2920    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2921      super(namespaceName);
2922    }
2923
2924    @Override
2925    String getOperationType() {
2926      return "DELETE_NAMESPACE";
2927    }
2928  }
2929
2930  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2931
2932    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2933      super(namespaceName);
2934    }
2935
2936    @Override
2937    String getOperationType() {
2938      return "MODIFY_NAMESPACE";
2939    }
2940  }
2941
2942  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2943
2944    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2945      super(tableName);
2946    }
2947
2948    @Override
2949    String getOperationType() {
2950      return "MERGE_REGIONS";
2951    }
2952  }
2953
2954  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2955
2956    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2957      super(tableName);
2958    }
2959
2960    @Override
2961    String getOperationType() {
2962      return "SPLIT_REGION";
2963    }
2964  }
2965
2966  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2967
2968    TruncateRegionProcedureBiConsumer(TableName tableName) {
2969      super(tableName);
2970    }
2971
2972    @Override
2973    String getOperationType() {
2974      return "TRUNCATE_REGION";
2975    }
2976  }
2977
2978  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2979    SnapshotProcedureBiConsumer(TableName tableName) {
2980      super(tableName);
2981    }
2982
2983    @Override
2984    String getOperationType() {
2985      return "SNAPSHOT";
2986    }
2987  }
2988
2989  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2990    private final String peerId;
2991    private final Supplier<String> getOperation;
2992
2993    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2994      this.peerId = peerId;
2995      this.getOperation = getOperation;
2996    }
2997
2998    String getDescription() {
2999      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3000    }
3001
3002    @Override
3003    void onFinished() {
3004      LOG.info(getDescription() + " completed");
3005    }
3006
3007    @Override
3008    void onError(Throwable error) {
3009      LOG.info(getDescription() + " failed with " + error.getMessage());
3010    }
3011  }
3012
3013  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
3014    CompletableFuture<Void> future = new CompletableFuture<>();
3015    addListener(procFuture, (procId, error) -> {
3016      if (error != null) {
3017        future.completeExceptionally(error);
3018        return;
3019      }
3020      getProcedureResult(procId, future, 0);
3021    });
3022    return future;
3023  }
3024
3025  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
3026    addListener(
3027      this.<GetProcedureResultResponse> newMasterCaller()
3028        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3029          GetProcedureResultResponse> call(controller, stub,
3030            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3031            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3032        .call(),
3033      (response, error) -> {
3034        if (error != null) {
3035          LOG.warn("failed to get the procedure result procId={}", procId,
3036            ConnectionUtils.translateException(error));
3037          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3038            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3039          return;
3040        }
3041        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3042          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3043            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3044          return;
3045        }
3046        if (response.hasException()) {
3047          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3048          future.completeExceptionally(ioe);
3049        } else {
3050          future.complete(null);
3051        }
3052      });
3053  }
3054
3055  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3056    CompletableFuture<T> future = new CompletableFuture<>();
3057    future.completeExceptionally(error);
3058    return future;
3059  }
3060
3061  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3062    if (error != null) {
3063      future.completeExceptionally(error);
3064      return true;
3065    }
3066    return false;
3067  }
3068
3069  @Override
3070  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3071    return getClusterMetrics(EnumSet.allOf(Option.class));
3072  }
3073
3074  @Override
3075  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3076    return this.<ClusterMetrics> newMasterCaller()
3077      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3078        ClusterMetrics> call(controller, stub,
3079          RequestConverter.buildGetClusterStatusRequest(options),
3080          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3081          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3082      .call();
3083  }
3084
3085  @Override
3086  public CompletableFuture<Void> shutdown() {
3087    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3088      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3089        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3090        resp -> null))
3091      .call();
3092  }
3093
3094  @Override
3095  public CompletableFuture<Void> stopMaster() {
3096    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3097      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3098        controller, stub, StopMasterRequest.newBuilder().build(),
3099        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3100      .call();
3101  }
3102
3103  @Override
3104  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3105    StopServerRequest request = RequestConverter
3106      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3107    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3108      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3109        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3110        resp -> null))
3111      .serverName(serverName).call();
3112  }
3113
3114  @Override
3115  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3116    return this.<Void> newAdminCaller()
3117      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3118        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3119          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3120      .serverName(serverName).call();
3121  }
3122
3123  @Override
3124  public CompletableFuture<Void> updateConfiguration() {
3125    CompletableFuture<Void> future = new CompletableFuture<Void>();
3126    addListener(
3127      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3128      (status, err) -> {
3129        if (err != null) {
3130          future.completeExceptionally(err);
3131        } else {
3132          List<CompletableFuture<Void>> futures = new ArrayList<>();
3133          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3134          futures.add(updateConfiguration(status.getMasterName()));
3135          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3136          addListener(
3137            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3138            (result, err2) -> {
3139              if (err2 != null) {
3140                future.completeExceptionally(err2);
3141              } else {
3142                future.complete(result);
3143              }
3144            });
3145        }
3146      });
3147    return future;
3148  }
3149
3150  @Override
3151  public CompletableFuture<Void> updateConfiguration(String groupName) {
3152    CompletableFuture<Void> future = new CompletableFuture<Void>();
3153    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3154      if (err != null) {
3155        future.completeExceptionally(err);
3156      } else if (rsGroupInfo == null) {
3157        future.completeExceptionally(
3158          new IllegalArgumentException("Group does not exist: " + groupName));
3159      } else {
3160        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3161          if (err2 != null) {
3162            future.completeExceptionally(err2);
3163          } else {
3164            List<CompletableFuture<Void>> futures = new ArrayList<>();
3165            List<ServerName> groupServers = status.getServersName().stream()
3166              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3167            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3168            addListener(
3169              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3170              (result, err3) -> {
3171                if (err3 != null) {
3172                  future.completeExceptionally(err3);
3173                } else {
3174                  future.complete(result);
3175                }
3176              });
3177          }
3178        });
3179      }
3180    });
3181    return future;
3182  }
3183
3184  @Override
3185  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3186    return this.<Void> newAdminCaller()
3187      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3188        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3189          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3190      .serverName(serverName).call();
3191  }
3192
3193  @Override
3194  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3195    return this.<Void> newAdminCaller()
3196      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3197        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3198          RequestConverter.buildClearCompactionQueuesRequest(queues),
3199          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3200      .serverName(serverName).call();
3201  }
3202
3203  @Override
3204  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3205    return this.<List<SecurityCapability>> newMasterCaller()
3206      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3207        List<SecurityCapability>> call(controller, stub,
3208          SecurityCapabilitiesRequest.newBuilder().build(),
3209          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3210          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3211      .call();
3212  }
3213
3214  @Override
3215  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3216    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3217  }
3218
3219  @Override
3220  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3221    TableName tableName) {
3222    Preconditions.checkNotNull(tableName,
3223      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3224    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3225  }
3226
3227  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3228    ServerName serverName) {
3229    return this.<List<RegionMetrics>> newAdminCaller()
3230      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3231        List<RegionMetrics>> adminCall(controller, stub, request,
3232          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3233          RegionMetricsBuilder::toRegionMetrics))
3234      .serverName(serverName).call();
3235  }
3236
3237  @Override
3238  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3239    return this.<Boolean> newMasterCaller()
3240      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3241        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3242          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3243          resp -> resp.getInMaintenanceMode()))
3244      .call();
3245  }
3246
3247  @Override
3248  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3249    CompactType compactType) {
3250    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3251
3252    switch (compactType) {
3253      case MOB:
3254        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3255          if (err != null) {
3256            future.completeExceptionally(err);
3257            return;
3258          }
3259          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3260
3261          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3262            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3263              GetRegionInfoResponse> adminCall(controller, stub,
3264                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3265                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3266            .call(), (resp2, err2) -> {
3267              if (err2 != null) {
3268                future.completeExceptionally(err2);
3269              } else {
3270                if (resp2.hasCompactionState()) {
3271                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3272                } else {
3273                  future.complete(CompactionState.NONE);
3274                }
3275              }
3276            });
3277        });
3278        break;
3279      case NORMAL:
3280        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3281          if (err != null) {
3282            future.completeExceptionally(err);
3283            return;
3284          }
3285          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3286          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3287          locations.stream().filter(loc -> loc.getServerName() != null)
3288            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3289            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3290              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3291                // If any region compaction state is MAJOR_AND_MINOR
3292                // the table compaction state is MAJOR_AND_MINOR, too.
3293                if (err2 != null) {
3294                  future.completeExceptionally(unwrapCompletionException(err2));
3295                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3296                  future.complete(regionState);
3297                } else {
3298                  regionStates.add(regionState);
3299                }
3300              }));
3301            });
3302          addListener(
3303            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3304            (ret, err3) -> {
3305              // If future not completed, check all regions's compaction state
3306              if (!future.isCompletedExceptionally() && !future.isDone()) {
3307                CompactionState state = CompactionState.NONE;
3308                for (CompactionState regionState : regionStates) {
3309                  switch (regionState) {
3310                    case MAJOR:
3311                      if (state == CompactionState.MINOR) {
3312                        future.complete(CompactionState.MAJOR_AND_MINOR);
3313                      } else {
3314                        state = CompactionState.MAJOR;
3315                      }
3316                      break;
3317                    case MINOR:
3318                      if (state == CompactionState.MAJOR) {
3319                        future.complete(CompactionState.MAJOR_AND_MINOR);
3320                      } else {
3321                        state = CompactionState.MINOR;
3322                      }
3323                      break;
3324                    case NONE:
3325                    default:
3326                  }
3327                }
3328                if (!future.isDone()) {
3329                  future.complete(state);
3330                }
3331              }
3332            });
3333        });
3334        break;
3335      default:
3336        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3337    }
3338
3339    return future;
3340  }
3341
3342  @Override
3343  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3344    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3345    addListener(getRegionLocation(regionName), (location, err) -> {
3346      if (err != null) {
3347        future.completeExceptionally(err);
3348        return;
3349      }
3350      ServerName serverName = location.getServerName();
3351      if (serverName == null) {
3352        future
3353          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3354        return;
3355      }
3356      addListener(
3357        this.<GetRegionInfoResponse> newAdminCaller()
3358          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3359            GetRegionInfoResponse> adminCall(controller, stub,
3360              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3361                true),
3362              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3363          .serverName(serverName).call(),
3364        (resp2, err2) -> {
3365          if (err2 != null) {
3366            future.completeExceptionally(err2);
3367          } else {
3368            if (resp2.hasCompactionState()) {
3369              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3370            } else {
3371              future.complete(CompactionState.NONE);
3372            }
3373          }
3374        });
3375    });
3376    return future;
3377  }
3378
3379  @Override
3380  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3381    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3382      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3383    return this.<Optional<Long>> newMasterCaller()
3384      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3385        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3386          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3387          ProtobufUtil::toOptionalTimestamp))
3388      .call();
3389  }
3390
3391  @Override
3392  public CompletableFuture<Optional<Long>>
3393    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3394    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3395    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3396    addListener(getRegionInfo(regionName), (region, err) -> {
3397      if (err != null) {
3398        future.completeExceptionally(err);
3399        return;
3400      }
3401      MajorCompactionTimestampForRegionRequest.Builder builder =
3402        MajorCompactionTimestampForRegionRequest.newBuilder();
3403      builder.setRegion(
3404        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3405      addListener(this.<Optional<Long>> newMasterCaller()
3406        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3407          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3408            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3409            ProtobufUtil::toOptionalTimestamp))
3410        .call(), (timestamp, err2) -> {
3411          if (err2 != null) {
3412            future.completeExceptionally(err2);
3413          } else {
3414            future.complete(timestamp);
3415          }
3416        });
3417    });
3418    return future;
3419  }
3420
3421  @Override
3422  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3423    List<String> serverNamesList) {
3424    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3425    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3426      if (err != null) {
3427        future.completeExceptionally(err);
3428        return;
3429      }
3430      // Accessed by multiple threads.
3431      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3432      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3433      serverNames.stream().forEach(serverName -> {
3434        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3435          if (err2 != null) {
3436            future.completeExceptionally(unwrapCompletionException(err2));
3437          } else {
3438            serverStates.put(serverName, serverState);
3439          }
3440        }));
3441      });
3442      addListener(
3443        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3444        (ret, err3) -> {
3445          if (!future.isCompletedExceptionally()) {
3446            if (err3 != null) {
3447              future.completeExceptionally(err3);
3448            } else {
3449              future.complete(serverStates);
3450            }
3451          }
3452        });
3453    });
3454    return future;
3455  }
3456
3457  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3458    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3459    if (serverNamesList.isEmpty()) {
3460      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3461        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3462      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3463        if (err != null) {
3464          future.completeExceptionally(err);
3465        } else {
3466          future.complete(clusterMetrics.getServersName());
3467        }
3468      });
3469      return future;
3470    } else {
3471      List<ServerName> serverList = new ArrayList<>();
3472      for (String regionServerName : serverNamesList) {
3473        ServerName serverName = null;
3474        try {
3475          serverName = ServerName.valueOf(regionServerName);
3476        } catch (Exception e) {
3477          future.completeExceptionally(
3478            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3479        }
3480        if (serverName == null) {
3481          future.completeExceptionally(
3482            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3483        } else {
3484          serverList.add(serverName);
3485        }
3486      }
3487      future.complete(serverList);
3488    }
3489    return future;
3490  }
3491
3492  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3493    return this.<Boolean> newAdminCaller().serverName(serverName)
3494      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3495        Boolean> adminCall(controller, stub,
3496          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3497          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3498      .call();
3499  }
3500
3501  @Override
3502  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3503    return this.<Boolean> newMasterCaller()
3504      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3505        Boolean> call(controller, stub,
3506          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3507          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3508          (resp) -> resp.getPrevBalanceValue()))
3509      .call();
3510  }
3511
3512  @Override
3513  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3514    return this.<BalanceResponse> newMasterCaller()
3515      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3516        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3517          (s, c, req, done) -> s.balance(c, req, done),
3518          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3519      .call();
3520  }
3521
3522  @Override
3523  public CompletableFuture<Boolean> isBalancerEnabled() {
3524    return this.<Boolean> newMasterCaller()
3525      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3526        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3527          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3528      .call();
3529  }
3530
3531  @Override
3532  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3533    return this.<Boolean> newMasterCaller()
3534      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3535        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3536          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3537          (resp) -> resp.getPrevNormalizerValue()))
3538      .call();
3539  }
3540
3541  @Override
3542  public CompletableFuture<Boolean> isNormalizerEnabled() {
3543    return this.<Boolean> newMasterCaller()
3544      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3545        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3546          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3547      .call();
3548  }
3549
3550  @Override
3551  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3552    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3553  }
3554
3555  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3556    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3557      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3558  }
3559
3560  @Override
3561  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3562    return this.<Boolean> newMasterCaller()
3563      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3564        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3565          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3566          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3567          (resp) -> resp.getPrevValue()))
3568      .call();
3569  }
3570
3571  @Override
3572  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3573    return this.<Boolean> newMasterCaller()
3574      .action(
3575        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3576          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3577            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3578      .call();
3579  }
3580
3581  @Override
3582  public CompletableFuture<Boolean> runCleanerChore() {
3583    return this.<Boolean> newMasterCaller()
3584      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3585        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3586          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3587          (resp) -> resp.getCleanerChoreRan()))
3588      .call();
3589  }
3590
3591  @Override
3592  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3593    return this.<Boolean> newMasterCaller()
3594      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3595        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3596          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3597      .call();
3598  }
3599
3600  @Override
3601  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3602    return this.<Boolean> newMasterCaller()
3603      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3604        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3605          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3606          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3607      .call();
3608  }
3609
3610  @Override
3611  public CompletableFuture<Integer> runCatalogJanitor() {
3612    return this.<Integer> newMasterCaller()
3613      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3614        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3615          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3616      .call();
3617  }
3618
3619  @Override
3620  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3621    ServiceCaller<S, R> callable) {
3622    MasterCoprocessorRpcChannelImpl channel =
3623      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3624    S stub = stubMaker.apply(channel);
3625    CompletableFuture<R> future = new CompletableFuture<>();
3626    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3627    callable.call(stub, controller, resp -> {
3628      if (controller.failed()) {
3629        future.completeExceptionally(controller.getFailed());
3630      } else {
3631        future.complete(resp);
3632      }
3633    });
3634    return future;
3635  }
3636
3637  @Override
3638  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3639    ServiceCaller<S, R> callable, ServerName serverName) {
3640    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3641      this.<Message> newServerCaller().serverName(serverName));
3642    S stub = stubMaker.apply(channel);
3643    CompletableFuture<R> future = new CompletableFuture<>();
3644    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3645    callable.call(stub, controller, resp -> {
3646      if (controller.failed()) {
3647        future.completeExceptionally(controller.getFailed());
3648      } else {
3649        future.complete(resp);
3650      }
3651    });
3652    return future;
3653  }
3654
3655  @Override
3656  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3657    return this.<List<ServerName>> newMasterCaller()
3658      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3659        List<ServerName>> call(controller, stub,
3660          RequestConverter.buildClearDeadServersRequest(servers),
3661          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3662          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3663      .call();
3664  }
3665
3666  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3667    return this.connection.callerFactory.<T> serverRequest()
3668      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3669      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3670      .pause(pauseNs, TimeUnit.NANOSECONDS)
3671      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3672      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3673  }
3674
3675  @Override
3676  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3677    if (tableName == null) {
3678      return failedFuture(new IllegalArgumentException("Table name is null"));
3679    }
3680    CompletableFuture<Void> future = new CompletableFuture<>();
3681    addListener(tableExists(tableName), (exist, err) -> {
3682      if (err != null) {
3683        future.completeExceptionally(err);
3684        return;
3685      }
3686      if (!exist) {
3687        future.completeExceptionally(new TableNotFoundException(
3688          "Table '" + tableName.getNameAsString() + "' does not exists."));
3689        return;
3690      }
3691      addListener(getTableSplits(tableName), (splits, err1) -> {
3692        if (err1 != null) {
3693          future.completeExceptionally(err1);
3694        } else {
3695          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3696            if (err2 != null) {
3697              future.completeExceptionally(err2);
3698            } else {
3699              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3700                if (err3 != null) {
3701                  future.completeExceptionally(err3);
3702                } else {
3703                  future.complete(result3);
3704                }
3705              });
3706            }
3707          });
3708        }
3709      });
3710    });
3711    return future;
3712  }
3713
3714  @Override
3715  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3716    if (tableName == null) {
3717      return failedFuture(new IllegalArgumentException("Table name is null"));
3718    }
3719    CompletableFuture<Void> future = new CompletableFuture<>();
3720    addListener(tableExists(tableName), (exist, err) -> {
3721      if (err != null) {
3722        future.completeExceptionally(err);
3723        return;
3724      }
3725      if (!exist) {
3726        future.completeExceptionally(new TableNotFoundException(
3727          "Table '" + tableName.getNameAsString() + "' does not exists."));
3728        return;
3729      }
3730      addListener(setTableReplication(tableName, false), (result, err2) -> {
3731        if (err2 != null) {
3732          future.completeExceptionally(err2);
3733        } else {
3734          future.complete(result);
3735        }
3736      });
3737    });
3738    return future;
3739  }
3740
3741  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3742    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3743    addListener(
3744      getRegions(tableName).thenApply(regions -> regions.stream()
3745        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3746      (regions, err2) -> {
3747        if (err2 != null) {
3748          future.completeExceptionally(err2);
3749          return;
3750        }
3751        if (regions.size() == 1) {
3752          future.complete(null);
3753        } else {
3754          byte[][] splits = new byte[regions.size() - 1][];
3755          for (int i = 1; i < regions.size(); i++) {
3756            splits[i - 1] = regions.get(i).getStartKey();
3757          }
3758          future.complete(splits);
3759        }
3760      });
3761    return future;
3762  }
3763
3764  /**
3765   * Connect to peer and check the table descriptor on peer:
3766   * <ol>
3767   * <li>Create the same table on peer when not exist.</li>
3768   * <li>Throw an exception if the table already has replication enabled on any of the column
3769   * families.</li>
3770   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3771   * </ol>
3772   * @param tableName name of the table to sync to the peer
3773   * @param splits    table split keys
3774   */
3775  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3776    byte[][] splits) {
3777    CompletableFuture<Void> future = new CompletableFuture<>();
3778    addListener(listReplicationPeers(), (peers, err) -> {
3779      if (err != null) {
3780        future.completeExceptionally(err);
3781        return;
3782      }
3783      if (peers == null || peers.size() <= 0) {
3784        future.completeExceptionally(
3785          new IllegalArgumentException("Found no peer cluster for replication."));
3786        return;
3787      }
3788      List<CompletableFuture<Void>> futures = new ArrayList<>();
3789      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3790        .forEach(peer -> {
3791          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3792        });
3793      addListener(
3794        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3795        (result, err2) -> {
3796          if (err2 != null) {
3797            future.completeExceptionally(err2);
3798          } else {
3799            future.complete(result);
3800          }
3801        });
3802    });
3803    return future;
3804  }
3805
3806  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3807    ReplicationPeerDescription peer) {
3808    Configuration peerConf;
3809    try {
3810      peerConf = ReplicationPeerConfigUtil
3811        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3812    } catch (IOException e) {
3813      return failedFuture(e);
3814    }
3815    URI connectionUri =
3816      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3817    CompletableFuture<Void> future = new CompletableFuture<>();
3818    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3819      if (err != null) {
3820        future.completeExceptionally(err);
3821        return;
3822      }
3823      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3824        if (err1 != null) {
3825          future.completeExceptionally(err1);
3826          return;
3827        }
3828        AsyncAdmin peerAdmin = conn.getAdmin();
3829        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3830          if (err2 != null) {
3831            future.completeExceptionally(err2);
3832            return;
3833          }
3834          if (!exist) {
3835            CompletableFuture<Void> createTableFuture = null;
3836            if (splits == null) {
3837              createTableFuture = peerAdmin.createTable(tableDesc);
3838            } else {
3839              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3840            }
3841            addListener(createTableFuture, (result, err3) -> {
3842              if (err3 != null) {
3843                future.completeExceptionally(err3);
3844              } else {
3845                future.complete(result);
3846              }
3847            });
3848          } else {
3849            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3850              (result, err4) -> {
3851                if (err4 != null) {
3852                  future.completeExceptionally(err4);
3853                } else {
3854                  future.complete(result);
3855                }
3856              });
3857          }
3858        });
3859      });
3860    });
3861    return future;
3862  }
3863
3864  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3865    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3866    CompletableFuture<Void> future = new CompletableFuture<>();
3867    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3868      if (err != null) {
3869        future.completeExceptionally(err);
3870        return;
3871      }
3872      if (peerTableDesc == null) {
3873        future.completeExceptionally(
3874          new IllegalArgumentException("Failed to get table descriptor for table "
3875            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3876        return;
3877      }
3878      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3879        future.completeExceptionally(new IllegalArgumentException(
3880          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3881            + ", but the table descriptors are not same when compared with source cluster."
3882            + " Thus can not enable the table's replication switch."));
3883        return;
3884      }
3885      future.complete(null);
3886    });
3887    return future;
3888  }
3889
3890  /**
3891   * Set the table's replication switch if the table's replication switch is already not set.
3892   * @param tableName name of the table
3893   * @param enableRep is replication switch enable or disable
3894   */
3895  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3896    CompletableFuture<Void> future = new CompletableFuture<>();
3897    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3898      if (err != null) {
3899        future.completeExceptionally(err);
3900        return;
3901      }
3902      if (!tableDesc.matchReplicationScope(enableRep)) {
3903        int scope =
3904          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3905        TableDescriptor newTableDesc =
3906          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3907        addListener(modifyTable(newTableDesc), (result, err2) -> {
3908          if (err2 != null) {
3909            future.completeExceptionally(err2);
3910          } else {
3911            future.complete(result);
3912          }
3913        });
3914      } else {
3915        future.complete(null);
3916      }
3917    });
3918    return future;
3919  }
3920
3921  @Override
3922  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3923    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3924    request.setPeerId(peerId);
3925    return this.<Boolean> newMasterCaller()
3926      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3927        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3928          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3929          resp -> resp.getIsEnabled()))
3930      .call();
3931  }
3932
3933  private void waitUntilAllReplicationPeerModificationProceduresDone(
3934    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
3935    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
3936      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
3937        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
3938          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
3939            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
3940            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
3941            resp -> resp.getProcedureList()))
3942        .call();
3943    addListener(callFuture, (r, e) -> {
3944      if (e != null) {
3945        future.completeExceptionally(e);
3946      } else if (r.isEmpty()) {
3947        // we are done
3948        future.complete(prevOn);
3949      } else {
3950        // retry later to see if the procedures are done
3951        retryTimer.newTimeout(
3952          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
3953          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3954      }
3955    });
3956  }
3957
3958  @Override
3959  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
3960    boolean drainProcedures) {
3961    ReplicationPeerModificationSwitchRequest request =
3962      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
3963    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
3964      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
3965        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
3966          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
3967          resp -> resp.getPreviousValue()))
3968      .call();
3969    // if we do not need to wait all previous peer modification procedure done, or we are enabling
3970    // peer modification, just return here.
3971    if (!drainProcedures || on) {
3972      return callFuture;
3973    }
3974    // otherwise we need to wait until all previous peer modification procedure done
3975    CompletableFuture<Boolean> future = new CompletableFuture<>();
3976    addListener(callFuture, (prevOn, err) -> {
3977      if (err != null) {
3978        future.completeExceptionally(err);
3979        return;
3980      }
3981      // even if the previous state is disabled, we still need to wait here, as there could be
3982      // another client thread which called this method just before us and have already changed the
3983      // state to off, but there are still peer modification procedures not finished, so we should
3984      // also wait here.
3985      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
3986    });
3987    return future;
3988  }
3989
3990  @Override
3991  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
3992    return this.<Boolean> newMasterCaller()
3993      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
3994        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
3995          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
3996          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
3997          (resp) -> resp.getEnabled()))
3998      .call();
3999  }
4000
4001  @Override
4002  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
4003    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
4004    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
4005      if (err != null) {
4006        future.completeExceptionally(err);
4007        return;
4008      }
4009      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4010        locations.stream().filter(l -> l.getRegion() != null)
4011          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4012          .collect(Collectors.groupingBy(l -> l.getServerName(),
4013            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4014      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4015      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4016      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4017        futures
4018          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4019            if (err2 != null) {
4020              future.completeExceptionally(unwrapCompletionException(err2));
4021            } else {
4022              aggregator.append(stats);
4023            }
4024          }));
4025      }
4026      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4027        (ret, err3) -> {
4028          if (err3 != null) {
4029            future.completeExceptionally(unwrapCompletionException(err3));
4030          } else {
4031            future.complete(aggregator.sum());
4032          }
4033        });
4034    });
4035    return future;
4036  }
4037
4038  @Override
4039  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4040    boolean preserveSplits) {
4041    CompletableFuture<Void> future = new CompletableFuture<>();
4042    addListener(tableExists(tableName), (exist, err) -> {
4043      if (err != null) {
4044        future.completeExceptionally(err);
4045        return;
4046      }
4047      if (!exist) {
4048        future.completeExceptionally(new TableNotFoundException(tableName));
4049        return;
4050      }
4051      addListener(tableExists(newTableName), (exist1, err1) -> {
4052        if (err1 != null) {
4053          future.completeExceptionally(err1);
4054          return;
4055        }
4056        if (exist1) {
4057          future.completeExceptionally(new TableExistsException(newTableName));
4058          return;
4059        }
4060        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4061          if (err2 != null) {
4062            future.completeExceptionally(err2);
4063            return;
4064          }
4065          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4066          if (preserveSplits) {
4067            addListener(getTableSplits(tableName), (splits, err3) -> {
4068              if (err3 != null) {
4069                future.completeExceptionally(err3);
4070              } else {
4071                addListener(
4072                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4073                  (result, err4) -> {
4074                    if (err4 != null) {
4075                      future.completeExceptionally(err4);
4076                    } else {
4077                      future.complete(result);
4078                    }
4079                  });
4080              }
4081            });
4082          } else {
4083            addListener(createTable(newTableDesc), (result, err5) -> {
4084              if (err5 != null) {
4085                future.completeExceptionally(err5);
4086              } else {
4087                future.complete(result);
4088              }
4089            });
4090          }
4091        });
4092      });
4093    });
4094    return future;
4095  }
4096
4097  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4098    List<RegionInfo> hris) {
4099    return this.<CacheEvictionStats> newAdminCaller()
4100      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4101        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4102          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4103          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4104          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4105      .serverName(serverName).call();
4106  }
4107
4108  @Override
4109  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4110    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4111      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4112        Boolean> call(controller, stub,
4113          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4114          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4115          resp -> resp.getPreviousRpcThrottleEnabled()))
4116      .call();
4117    return future;
4118  }
4119
4120  @Override
4121  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4122    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4123      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4124        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4125          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4126          resp -> resp.getRpcThrottleEnabled()))
4127      .call();
4128    return future;
4129  }
4130
4131  @Override
4132  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4133    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4134      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4135        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4136          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4137            .build(),
4138          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4139          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4140      .call();
4141    return future;
4142  }
4143
4144  @Override
4145  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4146    return this.<Map<TableName, Long>> newMasterCaller()
4147      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4148        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4149          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4150          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4151          resp -> resp.getSizesList().stream().collect(Collectors
4152            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4153      .call();
4154  }
4155
4156  @Override
4157  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4158    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4159    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4160      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4161        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4162          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4163          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4164          resp -> resp.getSnapshotsList().stream()
4165            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4166              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4167      .serverName(serverName).call();
4168  }
4169
4170  private CompletableFuture<SpaceQuotaSnapshot>
4171    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4172    return this.<SpaceQuotaSnapshot> newMasterCaller()
4173      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4174        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4175          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4176      .call();
4177  }
4178
4179  @Override
4180  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4181    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4182      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4183      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4184  }
4185
4186  @Override
4187  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4188    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4189    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4190      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4191      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4192  }
4193
4194  @Override
4195  public CompletableFuture<Void> grant(UserPermission userPermission,
4196    boolean mergeExistingPermissions) {
4197    return this.<Void> newMasterCaller()
4198      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4199        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4200        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4201      .call();
4202  }
4203
4204  @Override
4205  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4206    return this.<Void> newMasterCaller()
4207      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4208        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4209        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4210      .call();
4211  }
4212
4213  @Override
4214  public CompletableFuture<List<UserPermission>>
4215    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4216    return this.<List<UserPermission>> newMasterCaller()
4217      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4218        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4219          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4220          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4221          resp -> resp.getUserPermissionList().stream()
4222            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4223            .collect(Collectors.toList())))
4224      .call();
4225  }
4226
4227  @Override
4228  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4229    List<Permission> permissions) {
4230    return this.<List<Boolean>> newMasterCaller()
4231      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4232        List<Boolean>> call(controller, stub,
4233          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4234          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4235          resp -> resp.getHasUserPermissionList()))
4236      .call();
4237  }
4238
4239  @Override
4240  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4241    return this.<Boolean> newMasterCaller()
4242      .action((controller, stub) -> this.call(controller, stub,
4243        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4244        MasterService.Interface::switchSnapshotCleanup,
4245        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4246      .call();
4247  }
4248
4249  @Override
4250  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4251    return this.<Boolean> newMasterCaller()
4252      .action((controller, stub) -> this.call(controller, stub,
4253        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4254        MasterService.Interface::isSnapshotCleanupEnabled,
4255        IsSnapshotCleanupEnabledResponse::getEnabled))
4256      .call();
4257  }
4258
4259  @Override
4260  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4261    return this.<Void> newMasterCaller()
4262      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4263        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4264        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4265      .call();
4266  }
4267
4268  @Override
4269  public CompletableFuture<Void> addRSGroup(String groupName) {
4270    return this.<Void> newMasterCaller()
4271      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4272        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4273        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4274      .call();
4275  }
4276
4277  @Override
4278  public CompletableFuture<Void> removeRSGroup(String groupName) {
4279    return this.<Void> newMasterCaller()
4280      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4281        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4282        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4283      .call();
4284  }
4285
4286  @Override
4287  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4288    BalanceRequest request) {
4289    return this.<BalanceResponse> newMasterCaller()
4290      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4291        BalanceResponse> call(controller, stub,
4292          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4293          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4294      .call();
4295  }
4296
4297  @Override
4298  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4299    return this.<List<RSGroupInfo>> newMasterCaller()
4300      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4301        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4302          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4303            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4304      .call();
4305  }
4306
4307  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4308    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4309    final String logType) {
4310    if (CollectionUtils.isEmpty(serverNames)) {
4311      return CompletableFuture.completedFuture(Collections.emptyList());
4312    }
4313    return CompletableFuture.supplyAsync(() -> serverNames
4314      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4315        filterParams, limit, logType))
4316      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4317  }
4318
4319  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4320    Map<String, Object> filterParams, int limit, String logType) {
4321    return this.<List<LogEntry>> newAdminCaller()
4322      .action((controller, stub) -> this.adminCall(controller, stub,
4323        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4324        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4325      .serverName(serverName).call();
4326  }
4327
4328  @Override
4329  public CompletableFuture<List<Boolean>>
4330    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4331    if (CollectionUtils.isEmpty(serverNames)) {
4332      return CompletableFuture.completedFuture(Collections.emptyList());
4333    }
4334    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4335      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4336    return convertToFutureOfList(clearSlowLogResponseList);
4337  }
4338
4339  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4340    return this.<Boolean> newAdminCaller()
4341      .action((controller, stub) -> this.adminCall(controller, stub,
4342        RequestConverter.buildClearSlowLogResponseRequest(),
4343        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4344      .serverName(serverName).call();
4345  }
4346
4347  private static <T> CompletableFuture<List<T>>
4348    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4349    CompletableFuture<Void> allDoneFuture =
4350      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4351    return allDoneFuture
4352      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4353  }
4354
4355  @Override
4356  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4357    return this.<List<TableName>> newMasterCaller()
4358      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4359        List<TableName>> call(controller, stub,
4360          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4361          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4362            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4363      .call();
4364  }
4365
4366  @Override
4367  public CompletableFuture<Pair<List<String>, List<TableName>>>
4368    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4369    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4370      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4371        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4372        Pair<List<String>, List<TableName>>> call(controller, stub,
4373          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4374            .build(),
4375          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4376          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4377            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4378      .call();
4379  }
4380
4381  @Override
4382  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4383    return this.<RSGroupInfo> newMasterCaller()
4384      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4385        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4386          GetRSGroupInfoOfServerRequest.newBuilder()
4387            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4388              .setPort(hostPort.getPort()).build())
4389            .build(),
4390          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4391          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4392      .call();
4393  }
4394
4395  @Override
4396  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4397    return this.<Void> newMasterCaller()
4398      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4399        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4400        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4401      .call();
4402  }
4403
4404  @Override
4405  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4406    CompletableFuture<Void> future = new CompletableFuture<>();
4407    for (TableName tableName : tables) {
4408      addListener(tableExists(tableName), (exist, err) -> {
4409        if (err != null) {
4410          future.completeExceptionally(err);
4411          return;
4412        }
4413        if (!exist) {
4414          future.completeExceptionally(new TableNotFoundException(tableName));
4415          return;
4416        }
4417      });
4418    }
4419    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4420      if (err != null) {
4421        future.completeExceptionally(err);
4422        return;
4423      }
4424      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4425        future.complete(null);
4426        return;
4427      }
4428      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4429      for (TableDescriptor td : tableDescriptions) {
4430        newTableDescriptors
4431          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4432      }
4433      addListener(
4434        CompletableFuture.allOf(
4435          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4436        (v, e) -> {
4437          if (e != null) {
4438            future.completeExceptionally(e);
4439          } else {
4440            future.complete(v);
4441          }
4442        });
4443    });
4444    return future;
4445  }
4446
4447  @Override
4448  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4449    return this.<RSGroupInfo> newMasterCaller()
4450      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4451        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4452          GetRSGroupInfoOfTableRequest.newBuilder()
4453            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4454          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4455          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4456      .call();
4457  }
4458
4459  @Override
4460  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4461    return this.<RSGroupInfo> newMasterCaller()
4462      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4463        RSGroupInfo> call(controller, stub,
4464          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4465          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4466          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4467      .call();
4468  }
4469
4470  @Override
4471  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4472    return this.<Void> newMasterCaller()
4473      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4474        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4475          .setNewRsgroupName(newName).build(),
4476        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4477      .call();
4478  }
4479
4480  @Override
4481  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4482    Map<String, String> configuration) {
4483    UpdateRSGroupConfigRequest.Builder request =
4484      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4485    if (configuration != null) {
4486      configuration.entrySet().forEach(e -> request.addConfiguration(
4487        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4488    }
4489    return this.<Void> newMasterCaller()
4490      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4491        Void> call(controller, stub, request.build(),
4492          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4493      .call();
4494  }
4495
4496  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4497    return this.<List<LogEntry>> newMasterCaller()
4498      .action((controller, stub) -> this.call(controller, stub,
4499        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4500        ProtobufUtil::toBalancerDecisionResponse))
4501      .call();
4502  }
4503
4504  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4505    return this.<List<LogEntry>> newMasterCaller()
4506      .action((controller, stub) -> this.call(controller, stub,
4507        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4508        ProtobufUtil::toBalancerRejectionResponse))
4509      .call();
4510  }
4511
4512  @Override
4513  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4514    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4515    if (logType == null || serverType == null) {
4516      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4517    }
4518    switch (logType) {
4519      case "SLOW_LOG":
4520      case "LARGE_LOG":
4521        if (ServerType.MASTER.equals(serverType)) {
4522          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4523        }
4524        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4525      case "BALANCER_DECISION":
4526        if (ServerType.REGION_SERVER.equals(serverType)) {
4527          throw new IllegalArgumentException(
4528            "Balancer Decision logs are not maintained by HRegionServer");
4529        }
4530        return getBalancerDecisions(limit);
4531      case "BALANCER_REJECTION":
4532        if (ServerType.REGION_SERVER.equals(serverType)) {
4533          throw new IllegalArgumentException(
4534            "Balancer Rejection logs are not maintained by HRegionServer");
4535        }
4536        return getBalancerRejections(limit);
4537      default:
4538        return CompletableFuture.completedFuture(Collections.emptyList());
4539    }
4540  }
4541
4542  @Override
4543  public CompletableFuture<Void> flushMasterStore() {
4544    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4545    return this.<Void> newMasterCaller()
4546      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4547        Void> call(controller, stub, request.build(),
4548          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4549      .call();
4550  }
4551
4552  @Override
4553  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4554    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4555    return this.<List<String>> newAdminCaller()
4556      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4557        List<String>> adminCall(controller, stub, request.build(),
4558          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4559          resp -> resp.getCachedFilesList()))
4560      .serverName(serverName).call();
4561  }
4562}