001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.replication.SyncReplicationState;
090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
092import org.apache.hadoop.hbase.security.access.Permission;
093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
094import org.apache.hadoop.hbase.security.access.UserPermission;
095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.Strings;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
108import org.apache.hbase.thirdparty.com.google.protobuf.Message;
109import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
110import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
111import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
112import org.apache.hbase.thirdparty.io.netty.util.Timeout;
113import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
114import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
115
116import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
117import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
357import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
358
359/**
360 * The implementation of AsyncAdmin.
361 * <p>
362 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
363 * be finished inside the rpc framework thread, which means that the callbacks registered to the
364 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
365 * this class should not try to do time consuming tasks in the callbacks.
366 * @since 2.0.0
367 * @see AsyncHBaseAdmin
368 * @see AsyncConnection#getAdmin()
369 * @see AsyncConnection#getAdminBuilder()
370 */
371@InterfaceAudience.Private
372class RawAsyncHBaseAdmin implements AsyncAdmin {
373
374  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
375
376  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
377
378  private final AsyncConnectionImpl connection;
379
380  private final HashedWheelTimer retryTimer;
381
382  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
383
384  private final long rpcTimeoutNs;
385
386  private final long operationTimeoutNs;
387
388  private final long pauseNs;
389
390  private final long pauseNsForServerOverloaded;
391
392  private final int maxAttempts;
393
394  private final int startLogErrorsCnt;
395
396  private final NonceGenerator ng;
397
398  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
399    AsyncAdminBuilderBase builder) {
400    this.connection = connection;
401    this.retryTimer = retryTimer;
402    this.metaTable = connection.getTable(META_TABLE_NAME);
403    this.rpcTimeoutNs = builder.rpcTimeoutNs;
404    this.operationTimeoutNs = builder.operationTimeoutNs;
405    this.pauseNs = builder.pauseNs;
406    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
407      LOG.warn(
408        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
409          + " the normal pause value {} ms, use the greater one instead",
410        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
411        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
412      this.pauseNsForServerOverloaded = builder.pauseNs;
413    } else {
414      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
415    }
416    this.maxAttempts = builder.maxAttempts;
417    this.startLogErrorsCnt = builder.startLogErrorsCnt;
418    this.ng = connection.getNonceGenerator();
419  }
420
421  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
422    return this.connection.callerFactory.<T> masterRequest()
423      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
424      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
425      .pause(pauseNs, TimeUnit.NANOSECONDS)
426      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
427      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
428  }
429
430  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
431    return this.connection.callerFactory.<T> adminRequest()
432      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
433      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
434      .pause(pauseNs, TimeUnit.NANOSECONDS)
435      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
436      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
437  }
438
439  @FunctionalInterface
440  private interface MasterRpcCall<RESP, REQ> {
441    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
442      RpcCallback<RESP> done);
443  }
444
445  @FunctionalInterface
446  private interface AdminRpcCall<RESP, REQ> {
447    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
448      RpcCallback<RESP> done);
449  }
450
451  @FunctionalInterface
452  private interface Converter<D, S> {
453    D convert(S src) throws IOException;
454  }
455
456  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
457    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
458    Converter<RESP, PRESP> respConverter) {
459    CompletableFuture<RESP> future = new CompletableFuture<>();
460    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
461
462      @Override
463      public void run(PRESP resp) {
464        if (controller.failed()) {
465          future.completeExceptionally(controller.getFailed());
466        } else {
467          try {
468            future.complete(respConverter.convert(resp));
469          } catch (IOException e) {
470            future.completeExceptionally(e);
471          }
472        }
473      }
474    });
475    return future;
476  }
477
478  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
479    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
480    Converter<RESP, PRESP> respConverter) {
481    CompletableFuture<RESP> future = new CompletableFuture<>();
482    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
483
484      @Override
485      public void run(PRESP resp) {
486        if (controller.failed()) {
487          future.completeExceptionally(controller.getFailed());
488        } else {
489          try {
490            future.complete(respConverter.convert(resp));
491          } catch (IOException e) {
492            future.completeExceptionally(e);
493          }
494        }
495      }
496    });
497    return future;
498  }
499
500  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
501    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
502    ProcedureBiConsumer consumer) {
503    return procedureCall(b -> {
504    }, preq, rpcCall, respConverter, consumer);
505  }
506
507  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
508    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
509    ProcedureBiConsumer consumer) {
510    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
511  }
512
513  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
514    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
515    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
516    ProcedureBiConsumer consumer) {
517    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
518      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
519    prioritySetter.accept(builder);
520    CompletableFuture<Long> procFuture = builder.call();
521    CompletableFuture<Void> future = waitProcedureResult(procFuture);
522    addListener(future, consumer);
523    return future;
524  }
525
526  @Override
527  public CompletableFuture<Boolean> tableExists(TableName tableName) {
528    if (TableName.isMetaTableName(tableName)) {
529      return CompletableFuture.completedFuture(true);
530    }
531    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
532  }
533
534  @Override
535  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
536    return getTableDescriptors(
537      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
538  }
539
540  /**
541   * {@link #listTableDescriptors(boolean)}
542   */
543  @Override
544  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
545    boolean includeSysTables) {
546    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
547      + "use listTableDescriptors(boolean) instead");
548    return getTableDescriptors(
549      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
550  }
551
552  @Override
553  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
554    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
555      + "use listTableDescriptors(boolean) instead");
556    if (tableNames.isEmpty()) {
557      return CompletableFuture.completedFuture(Collections.emptyList());
558    }
559    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
560  }
561
562  private CompletableFuture<List<TableDescriptor>>
563    getTableDescriptors(GetTableDescriptorsRequest request) {
564    return this.<List<TableDescriptor>> newMasterCaller()
565      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
566        List<TableDescriptor>> call(controller, stub, request,
567          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
568          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
569      .call();
570  }
571
572  @Override
573  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
574    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
575  }
576
577  @Override
578  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
579    boolean includeSysTables) {
580    Preconditions.checkNotNull(pattern,
581      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
582    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
583  }
584
585  @Override
586  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
587    return this.<List<TableName>> newMasterCaller()
588      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
589        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
590          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
591          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
592          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
593      .call();
594  }
595
596  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
597    return this.<List<TableName>> newMasterCaller()
598      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
599        List<TableName>> call(controller, stub, request,
600          (s, c, req, done) -> s.getTableNames(c, req, done),
601          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
602      .call();
603  }
604
605  @Override
606  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
607    return this.<List<TableDescriptor>> newMasterCaller()
608      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
609        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
610          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
611          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
612          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
613      .call();
614  }
615
616  @Override
617  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
618    return this.<List<TableDescriptor>> newMasterCaller()
619      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
620        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
621          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
622          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
623          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
624      .call();
625  }
626
627  @Override
628  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
629    return this.<List<TableName>> newMasterCaller()
630      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
631        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
632          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
633          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
634          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
635      .call();
636  }
637
638  @Override
639  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
640    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
641    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
642      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
643        List<TableSchema>> call(controller, stub,
644          RequestConverter.buildGetTableDescriptorsRequest(tableName),
645          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
646          (resp) -> resp.getTableSchemaList()))
647      .call(), (tableSchemas, error) -> {
648        if (error != null) {
649          future.completeExceptionally(error);
650          return;
651        }
652        if (!tableSchemas.isEmpty()) {
653          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
654        } else {
655          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
656        }
657      });
658    return future;
659  }
660
661  @Override
662  public CompletableFuture<Void> createTable(TableDescriptor desc) {
663    return createTable(desc.getTableName(),
664      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
665  }
666
667  @Override
668  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
669    int numRegions) {
670    try {
671      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
672    } catch (IllegalArgumentException e) {
673      return failedFuture(e);
674    }
675  }
676
677  @Override
678  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
679    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
680      + " use createTable(TableDescriptor) instead");
681    try {
682      verifySplitKeys(splitKeys);
683      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
684        splitKeys, ng.getNonceGroup(), ng.newNonce()));
685    } catch (IllegalArgumentException e) {
686      return failedFuture(e);
687    }
688  }
689
690  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
691    Preconditions.checkNotNull(tableName, "table name is null");
692    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
693      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
694      new CreateTableProcedureBiConsumer(tableName));
695  }
696
697  @Override
698  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
699    return modifyTable(desc, true);
700  }
701
702  @Override
703  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
704    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
705      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
706        ng.newNonce(), reopenRegions),
707      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
708      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
709  }
710
711  @Override
712  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
713    return this.<ModifyTableStoreFileTrackerRequest,
714      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
715        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
716          ng.getNonceGroup(), ng.newNonce()),
717        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
718        (resp) -> resp.getProcId(),
719        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
720  }
721
722  @Override
723  public CompletableFuture<Void> deleteTable(TableName tableName) {
724    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
725      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
726      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
727      new DeleteTableProcedureBiConsumer(tableName));
728  }
729
730  @Override
731  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
732    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
733      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
734        ng.newNonce()),
735      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
736      new TruncateTableProcedureBiConsumer(tableName));
737  }
738
739  @Override
740  public CompletableFuture<Void> enableTable(TableName tableName) {
741    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
742      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
743      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
744      new EnableTableProcedureBiConsumer(tableName));
745  }
746
747  @Override
748  public CompletableFuture<Void> disableTable(TableName tableName) {
749    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
750      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
751      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
752      new DisableTableProcedureBiConsumer(tableName));
753  }
754
755  /**
756   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
757   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
758   * targetState).
759   */
760  private static CompletableFuture<Boolean> completeCheckTableState(
761    CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error,
762    TableState.State targetState, TableName tableName) {
763    if (error != null) {
764      future.completeExceptionally(error);
765    } else {
766      if (tableState.isPresent()) {
767        future.complete(tableState.get().inStates(targetState));
768      } else {
769        future.completeExceptionally(new TableNotFoundException(tableName));
770      }
771    }
772    return future;
773  }
774
775  @Override
776  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
777    if (TableName.isMetaTableName(tableName)) {
778      return CompletableFuture.completedFuture(true);
779    }
780    CompletableFuture<Boolean> future = new CompletableFuture<>();
781    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
782      (tableState, error) -> {
783        completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName);
784      });
785    return future;
786  }
787
788  @Override
789  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
790    if (TableName.isMetaTableName(tableName)) {
791      return CompletableFuture.completedFuture(false);
792    }
793    CompletableFuture<Boolean> future = new CompletableFuture<>();
794    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
795      (tableState, error) -> {
796        completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName);
797      });
798    return future;
799  }
800
801  @Override
802  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
803    if (TableName.isMetaTableName(tableName)) {
804      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
805        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
806    }
807    CompletableFuture<Boolean> future = new CompletableFuture<>();
808    addListener(isTableEnabled(tableName), (enabled, error) -> {
809      if (error != null) {
810        if (error instanceof TableNotFoundException) {
811          future.complete(false);
812        } else {
813          future.completeExceptionally(error);
814        }
815        return;
816      }
817      if (!enabled) {
818        future.complete(false);
819      } else {
820        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
821          (locations, error1) -> {
822            if (error1 != null) {
823              future.completeExceptionally(error1);
824              return;
825            }
826            List<HRegionLocation> notDeployedRegions = locations.stream()
827              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
828            if (notDeployedRegions.size() > 0) {
829              if (LOG.isDebugEnabled()) {
830                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
831              }
832              future.complete(false);
833              return;
834            }
835            future.complete(true);
836          });
837      }
838    });
839    return future;
840  }
841
842  @Override
843  public CompletableFuture<Void> addColumnFamily(TableName tableName,
844    ColumnFamilyDescriptor columnFamily) {
845    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
846      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
847        ng.newNonce()),
848      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
849      new AddColumnFamilyProcedureBiConsumer(tableName));
850  }
851
852  @Override
853  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
854    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
855      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
856        ng.newNonce()),
857      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
858      new DeleteColumnFamilyProcedureBiConsumer(tableName));
859  }
860
861  @Override
862  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
863    ColumnFamilyDescriptor columnFamily) {
864    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
865      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
866        ng.newNonce()),
867      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
868      new ModifyColumnFamilyProcedureBiConsumer(tableName));
869  }
870
871  @Override
872  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
873    byte[] family, String dstSFT) {
874    return this.<ModifyColumnStoreFileTrackerRequest,
875      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
876        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
877          ng.getNonceGroup(), ng.newNonce()),
878        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
879        (resp) -> resp.getProcId(),
880        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
881  }
882
883  @Override
884  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
885    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
886      RequestConverter.buildCreateNamespaceRequest(descriptor),
887      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
888      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
889  }
890
891  @Override
892  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
893    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
894      RequestConverter.buildModifyNamespaceRequest(descriptor),
895      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
896      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
897  }
898
899  @Override
900  public CompletableFuture<Void> deleteNamespace(String name) {
901    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
902      RequestConverter.buildDeleteNamespaceRequest(name),
903      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
904      new DeleteNamespaceProcedureBiConsumer(name));
905  }
906
907  @Override
908  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
909    return this.<NamespaceDescriptor> newMasterCaller()
910      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
911        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
912          RequestConverter.buildGetNamespaceDescriptorRequest(name),
913          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
914          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
915      .call();
916  }
917
918  @Override
919  public CompletableFuture<List<String>> listNamespaces() {
920    return this.<List<String>> newMasterCaller()
921      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
922        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
923          (s, c, req, done) -> s.listNamespaces(c, req, done),
924          (resp) -> resp.getNamespaceNameList()))
925      .call();
926  }
927
928  @Override
929  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
930    return this.<List<NamespaceDescriptor>> newMasterCaller()
931      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
932        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
933          ListNamespaceDescriptorsRequest.newBuilder().build(),
934          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
935          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
936      .call();
937  }
938
939  @Override
940  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
941    return this.<List<RegionInfo>> newAdminCaller()
942      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
943        List<RegionInfo>> adminCall(controller, stub,
944          RequestConverter.buildGetOnlineRegionRequest(),
945          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
946          resp -> ProtobufUtil.getRegionInfos(resp)))
947      .serverName(serverName).call();
948  }
949
950  @Override
951  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
952    if (tableName.equals(META_TABLE_NAME)) {
953      return connection.registry.getMetaRegionLocations()
954        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
955          .collect(Collectors.toList()));
956    } else {
957      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
958        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
959    }
960  }
961
962  @Override
963  public CompletableFuture<Void> flush(TableName tableName) {
964    return flush(tableName, Collections.emptyList());
965  }
966
967  @Override
968  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
969    Preconditions.checkNotNull(columnFamily,
970      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
971    return flush(tableName, Collections.singletonList(columnFamily));
972  }
973
974  @Override
975  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
976    // This is for keeping compatibility with old implementation.
977    // If the server version is lower than the client version, it's possible that the
978    // flushTable method is not present in the server side, if so, we need to fall back
979    // to the old implementation.
980    Preconditions.checkNotNull(columnFamilyList,
981      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
982    List<byte[]> columnFamilies = columnFamilyList.stream()
983      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
984    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
985      ng.getNonceGroup(), ng.newNonce());
986    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
987      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
988      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
989    CompletableFuture<Void> future = new CompletableFuture<>();
990    addListener(procFuture, (ret, error) -> {
991      if (error != null) {
992        if (
993          error instanceof TableNotFoundException || error instanceof TableNotEnabledException
994            || error instanceof NoSuchColumnFamilyException
995        ) {
996          future.completeExceptionally(error);
997        } else if (error instanceof DoNotRetryIOException) {
998          // usually this is caused by the method is not present on the server or
999          // the hbase hadoop version does not match the running hadoop version.
1000          // if that happens, we need fall back to the old flush implementation.
1001          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
1002          legacyFlush(future, tableName, columnFamilies);
1003        } else {
1004          future.completeExceptionally(error);
1005        }
1006      } else {
1007        future.complete(ret);
1008      }
1009    });
1010    return future;
1011  }
1012
1013  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1014    List<byte[]> columnFamilies) {
1015    addListener(tableExists(tableName), (exists, err) -> {
1016      if (err != null) {
1017        future.completeExceptionally(err);
1018      } else if (!exists) {
1019        future.completeExceptionally(new TableNotFoundException(tableName));
1020      } else {
1021        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1022          if (err2 != null) {
1023            future.completeExceptionally(err2);
1024          } else if (!tableEnabled) {
1025            future.completeExceptionally(new TableNotEnabledException(tableName));
1026          } else {
1027            Map<String, String> props = new HashMap<>();
1028            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1029              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1030                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1031            }
1032            addListener(
1033              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1034              (ret, err3) -> {
1035                if (err3 != null) {
1036                  future.completeExceptionally(err3);
1037                } else {
1038                  future.complete(ret);
1039                }
1040              });
1041          }
1042        });
1043      }
1044    });
1045  }
1046
1047  @Override
1048  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1049    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1050    });
1051  }
1052
1053  @Override
1054  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1055    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1056      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1057    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1058    });
1059  }
1060
1061  /**
1062   * This method is for internal use only, where we need the response of the flush.
1063   * <p/>
1064   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1065   * API.
1066   */
1067  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1068    boolean writeFlushWALMarker) {
1069    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1070    addListener(getRegionLocation(regionName), (location, err) -> {
1071      if (err != null) {
1072        future.completeExceptionally(err);
1073        return;
1074      }
1075      ServerName serverName = location.getServerName();
1076      if (serverName == null) {
1077        future
1078          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1079        return;
1080      }
1081      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1082        (ret, err2) -> {
1083          if (err2 != null) {
1084            future.completeExceptionally(err2);
1085          } else {
1086            future.complete(ret);
1087          }
1088        });
1089    });
1090    return future;
1091  }
1092
1093  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1094    byte[] columnFamily, boolean writeFlushWALMarker) {
1095    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1096      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1097        FlushRegionResponse> adminCall(controller, stub,
1098          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1099            writeFlushWALMarker),
1100          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1101      .call();
1102  }
1103
1104  @Override
1105  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1106    CompletableFuture<Void> future = new CompletableFuture<>();
1107    addListener(getRegions(sn), (hRegionInfos, err) -> {
1108      if (err != null) {
1109        future.completeExceptionally(err);
1110        return;
1111      }
1112      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1113      if (hRegionInfos != null) {
1114        hRegionInfos
1115          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1116          })));
1117      }
1118      addListener(CompletableFuture.allOf(
1119        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1120          if (err2 != null) {
1121            future.completeExceptionally(err2);
1122          } else {
1123            future.complete(ret);
1124          }
1125        });
1126    });
1127    return future;
1128  }
1129
1130  @Override
1131  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1132    return compact(tableName, null, false, compactType);
1133  }
1134
1135  @Override
1136  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1137    CompactType compactType) {
1138    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1139      + "If you don't specify a columnFamily, use compact(TableName) instead");
1140    return compact(tableName, columnFamily, false, compactType);
1141  }
1142
1143  @Override
1144  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1145    return compactRegion(regionName, null, false);
1146  }
1147
1148  @Override
1149  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1150    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1151      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1152    return compactRegion(regionName, columnFamily, false);
1153  }
1154
1155  @Override
1156  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1157    return compact(tableName, null, true, compactType);
1158  }
1159
1160  @Override
1161  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1162    CompactType compactType) {
1163    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1164      + "If you don't specify a columnFamily, use compact(TableName) instead");
1165    return compact(tableName, columnFamily, true, compactType);
1166  }
1167
1168  @Override
1169  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1170    return compactRegion(regionName, null, true);
1171  }
1172
1173  @Override
1174  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1175    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1176      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1177    return compactRegion(regionName, columnFamily, true);
1178  }
1179
1180  @Override
1181  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1182    return compactRegionServer(sn, false);
1183  }
1184
1185  @Override
1186  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1187    return compactRegionServer(sn, true);
1188  }
1189
1190  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1191    CompletableFuture<Void> future = new CompletableFuture<>();
1192    addListener(getRegions(sn), (hRegionInfos, err) -> {
1193      if (err != null) {
1194        future.completeExceptionally(err);
1195        return;
1196      }
1197      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1198      if (hRegionInfos != null) {
1199        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1200      }
1201      addListener(CompletableFuture.allOf(
1202        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1203          if (err2 != null) {
1204            future.completeExceptionally(err2);
1205          } else {
1206            future.complete(ret);
1207          }
1208        });
1209    });
1210    return future;
1211  }
1212
1213  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1214    boolean major) {
1215    CompletableFuture<Void> future = new CompletableFuture<>();
1216    addListener(getRegionLocation(regionName), (location, err) -> {
1217      if (err != null) {
1218        future.completeExceptionally(err);
1219        return;
1220      }
1221      ServerName serverName = location.getServerName();
1222      if (serverName == null) {
1223        future
1224          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1225        return;
1226      }
1227      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1228        (ret, err2) -> {
1229          if (err2 != null) {
1230            future.completeExceptionally(err2);
1231          } else {
1232            future.complete(ret);
1233          }
1234        });
1235    });
1236    return future;
1237  }
1238
1239  /**
1240   * List all region locations for the specific table.
1241   */
1242  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1243    if (TableName.META_TABLE_NAME.equals(tableName)) {
1244      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1245      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1246        if (err != null) {
1247          future.completeExceptionally(err);
1248        } else if (
1249          metaRegions == null || metaRegions.isEmpty()
1250            || metaRegions.getDefaultRegionLocation() == null
1251        ) {
1252          future.completeExceptionally(new IOException("meta region does not found"));
1253        } else {
1254          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1255        }
1256      });
1257      return future;
1258    } else {
1259      // For non-meta table, we fetch all locations by scanning hbase:meta table
1260      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1261    }
1262  }
1263
1264  /**
1265   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1266   */
1267  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1268    CompactType compactType) {
1269    CompletableFuture<Void> future = new CompletableFuture<>();
1270
1271    switch (compactType) {
1272      case MOB:
1273        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1274          if (err != null) {
1275            future.completeExceptionally(err);
1276            return;
1277          }
1278          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1279          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1280            if (err2 != null) {
1281              future.completeExceptionally(err2);
1282            } else {
1283              future.complete(ret);
1284            }
1285          });
1286        });
1287        break;
1288      case NORMAL:
1289        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1290          if (err != null) {
1291            future.completeExceptionally(err);
1292            return;
1293          }
1294          if (locations == null || locations.isEmpty()) {
1295            future.completeExceptionally(new TableNotFoundException(tableName));
1296          }
1297          CompletableFuture<?>[] compactFutures =
1298            locations.stream().filter(l -> l.getRegion() != null)
1299              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1300              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1301              .toArray(CompletableFuture<?>[]::new);
1302          // future complete unless all of the compact futures are completed.
1303          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1304            if (err2 != null) {
1305              future.completeExceptionally(err2);
1306            } else {
1307              future.complete(ret);
1308            }
1309          });
1310        });
1311        break;
1312      default:
1313        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1314    }
1315    return future;
1316  }
1317
1318  /**
1319   * Compact the region at specific region server.
1320   */
1321  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1322    final boolean major, byte[] columnFamily) {
1323    return this.<Void> newAdminCaller().serverName(sn)
1324      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1325        Void> adminCall(controller, stub,
1326          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1327          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1328      .call();
1329  }
1330
1331  private byte[] toEncodeRegionName(byte[] regionName) {
1332    return RegionInfo.isEncodedRegionName(regionName)
1333      ? regionName
1334      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1335  }
1336
1337  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1338    CompletableFuture<TableName> result) {
1339    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1340      if (err != null) {
1341        result.completeExceptionally(err);
1342        return;
1343      }
1344      RegionInfo regionInfo = location.getRegion();
1345      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1346        result.completeExceptionally(
1347          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1348        return;
1349      }
1350      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1351        if (!tableName.get().equals(regionInfo.getTable())) {
1352          // tables of this two region should be same.
1353          result.completeExceptionally(
1354            new IllegalArgumentException("Cannot merge regions from two different tables "
1355              + tableName.get() + " and " + regionInfo.getTable()));
1356        } else {
1357          result.complete(tableName.get());
1358        }
1359      }
1360    });
1361  }
1362
1363  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1364    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1365    CompletableFuture<TableName> future = new CompletableFuture<>();
1366    for (byte[] encodedRegionName : encodedRegionNames) {
1367      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1368    }
1369    return future;
1370  }
1371
1372  @Override
1373  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1374    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1375  }
1376
1377  @Override
1378  public CompletableFuture<Boolean> isMergeEnabled() {
1379    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1380  }
1381
1382  @Override
1383  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1384    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1385  }
1386
1387  @Override
1388  public CompletableFuture<Boolean> isSplitEnabled() {
1389    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1390  }
1391
1392  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1393    MasterSwitchType switchType) {
1394    SetSplitOrMergeEnabledRequest request =
1395      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1396    return this.<Boolean> newMasterCaller()
1397      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1398        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1399          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1400          (resp) -> resp.getPrevValueList().get(0)))
1401      .call();
1402  }
1403
1404  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1405    IsSplitOrMergeEnabledRequest request =
1406      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1407    return this.<Boolean> newMasterCaller()
1408      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1409        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1410          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1411      .call();
1412  }
1413
1414  @Override
1415  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1416    if (nameOfRegionsToMerge.size() < 2) {
1417      return failedFuture(new IllegalArgumentException(
1418        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1419    }
1420    CompletableFuture<Void> future = new CompletableFuture<>();
1421    byte[][] encodedNameOfRegionsToMerge =
1422      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1423
1424    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1425      if (err != null) {
1426        future.completeExceptionally(err);
1427        return;
1428      }
1429
1430      final MergeTableRegionsRequest request;
1431      try {
1432        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1433          forcible, ng.getNonceGroup(), ng.newNonce());
1434      } catch (DeserializationException e) {
1435        future.completeExceptionally(e);
1436        return;
1437      }
1438
1439      addListener(
1440        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1441          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1442        (ret, err2) -> {
1443          if (err2 != null) {
1444            future.completeExceptionally(err2);
1445          } else {
1446            future.complete(ret);
1447          }
1448        });
1449    });
1450    return future;
1451  }
1452
1453  @Override
1454  public CompletableFuture<Void> split(TableName tableName) {
1455    CompletableFuture<Void> future = new CompletableFuture<>();
1456    addListener(tableExists(tableName), (exist, error) -> {
1457      if (error != null) {
1458        future.completeExceptionally(error);
1459        return;
1460      }
1461      if (!exist) {
1462        future.completeExceptionally(new TableNotFoundException(tableName));
1463        return;
1464      }
1465      addListener(metaTable
1466        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1467          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1468            ClientMetaTableAccessor.QueryType.REGION))
1469          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1470            ClientMetaTableAccessor.QueryType.REGION))),
1471        (results, err2) -> {
1472          if (err2 != null) {
1473            future.completeExceptionally(err2);
1474            return;
1475          }
1476          if (results != null && !results.isEmpty()) {
1477            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1478            for (Result r : results) {
1479              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1480                continue;
1481              }
1482              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1483              if (rl != null) {
1484                for (HRegionLocation h : rl.getRegionLocations()) {
1485                  if (h != null && h.getServerName() != null) {
1486                    RegionInfo hri = h.getRegion();
1487                    if (
1488                      hri == null || hri.isSplitParent()
1489                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1490                    ) {
1491                      continue;
1492                    }
1493                    splitFutures.add(split(hri, null));
1494                  }
1495                }
1496              }
1497            }
1498            addListener(
1499              CompletableFuture
1500                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1501              (ret, exception) -> {
1502                if (exception != null) {
1503                  future.completeExceptionally(exception);
1504                  return;
1505                }
1506                future.complete(ret);
1507              });
1508          } else {
1509            future.complete(null);
1510          }
1511        });
1512    });
1513    return future;
1514  }
1515
1516  @Override
1517  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1518    CompletableFuture<Void> result = new CompletableFuture<>();
1519    if (splitPoint == null) {
1520      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1521    }
1522    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1523      (loc, err) -> {
1524        if (err != null) {
1525          result.completeExceptionally(err);
1526        } else if (loc == null || loc.getRegion() == null) {
1527          result.completeExceptionally(new IllegalArgumentException(
1528            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1529        } else {
1530          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1531            if (err2 != null) {
1532              result.completeExceptionally(err2);
1533            } else {
1534              result.complete(ret);
1535            }
1536
1537          });
1538        }
1539      });
1540    return result;
1541  }
1542
1543  @Override
1544  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1545    CompletableFuture<Void> future = new CompletableFuture<>();
1546    addListener(getRegionLocation(regionName), (location, err) -> {
1547      if (err != null) {
1548        future.completeExceptionally(err);
1549        return;
1550      }
1551      RegionInfo regionInfo = location.getRegion();
1552      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1553        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1554          + "Replicas are auto-split when their primary is split."));
1555        return;
1556      }
1557      ServerName serverName = location.getServerName();
1558      if (serverName == null) {
1559        future
1560          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1561        return;
1562      }
1563      addListener(split(regionInfo, null), (ret, err2) -> {
1564        if (err2 != null) {
1565          future.completeExceptionally(err2);
1566        } else {
1567          future.complete(ret);
1568        }
1569      });
1570    });
1571    return future;
1572  }
1573
1574  @Override
1575  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1576    Preconditions.checkNotNull(splitPoint,
1577      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1578    CompletableFuture<Void> future = new CompletableFuture<>();
1579    addListener(getRegionLocation(regionName), (location, err) -> {
1580      if (err != null) {
1581        future.completeExceptionally(err);
1582        return;
1583      }
1584      RegionInfo regionInfo = location.getRegion();
1585      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1586        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1587          + "Replicas are auto-split when their primary is split."));
1588        return;
1589      }
1590      ServerName serverName = location.getServerName();
1591      if (serverName == null) {
1592        future
1593          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1594        return;
1595      }
1596      if (
1597        regionInfo.getStartKey() != null
1598          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1599      ) {
1600        future.completeExceptionally(
1601          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1602        return;
1603      }
1604      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1605        if (err2 != null) {
1606          future.completeExceptionally(err2);
1607        } else {
1608          future.complete(ret);
1609        }
1610      });
1611    });
1612    return future;
1613  }
1614
1615  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1616    CompletableFuture<Void> future = new CompletableFuture<>();
1617    TableName tableName = hri.getTable();
1618    final SplitTableRegionRequest request;
1619    try {
1620      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1621        ng.newNonce());
1622    } catch (DeserializationException e) {
1623      future.completeExceptionally(e);
1624      return future;
1625    }
1626
1627    addListener(
1628      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1629        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1630      (ret, err2) -> {
1631        if (err2 != null) {
1632          future.completeExceptionally(err2);
1633        } else {
1634          future.complete(ret);
1635        }
1636      });
1637    return future;
1638  }
1639
1640  @Override
1641  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1642    CompletableFuture<Void> future = new CompletableFuture<>();
1643    addListener(getRegionLocation(regionName), (location, err) -> {
1644      if (err != null) {
1645        future.completeExceptionally(err);
1646        return;
1647      }
1648      RegionInfo regionInfo = location.getRegion();
1649      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1650        future.completeExceptionally(new IllegalArgumentException(
1651          "Can't truncate replicas directly.Replicas are auto-truncated "
1652            + "when their primary is truncated."));
1653        return;
1654      }
1655      ServerName serverName = location.getServerName();
1656      if (serverName == null) {
1657        future
1658          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1659        return;
1660      }
1661      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1662        if (err2 != null) {
1663          future.completeExceptionally(err2);
1664        } else {
1665          future.complete(ret);
1666        }
1667      });
1668    });
1669    return future;
1670  }
1671
1672  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1673    CompletableFuture<Void> future = new CompletableFuture<>();
1674    TableName tableName = hri.getTable();
1675    final MasterProtos.TruncateRegionRequest request;
1676    try {
1677      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1678    } catch (DeserializationException e) {
1679      future.completeExceptionally(e);
1680      return future;
1681    }
1682    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1683      MasterProtos.TruncateRegionResponse::getProcId,
1684      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1685        if (err2 != null) {
1686          future.completeExceptionally(err2);
1687        } else {
1688          future.complete(ret);
1689        }
1690      });
1691    return future;
1692  }
1693
1694  @Override
1695  public CompletableFuture<Void> assign(byte[] regionName) {
1696    CompletableFuture<Void> future = new CompletableFuture<>();
1697    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1698      if (err != null) {
1699        future.completeExceptionally(err);
1700        return;
1701      }
1702      addListener(
1703        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1704          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1705            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1706            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1707          .call(),
1708        (ret, err2) -> {
1709          if (err2 != null) {
1710            future.completeExceptionally(err2);
1711          } else {
1712            future.complete(ret);
1713          }
1714        });
1715    });
1716    return future;
1717  }
1718
1719  @Override
1720  public CompletableFuture<Void> unassign(byte[] regionName) {
1721    CompletableFuture<Void> future = new CompletableFuture<>();
1722    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1723      if (err != null) {
1724        future.completeExceptionally(err);
1725        return;
1726      }
1727      addListener(
1728        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1729          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1730            Void> call(controller, stub,
1731              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1732              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1733          .call(),
1734        (ret, err2) -> {
1735          if (err2 != null) {
1736            future.completeExceptionally(err2);
1737          } else {
1738            future.complete(ret);
1739          }
1740        });
1741    });
1742    return future;
1743  }
1744
1745  @Override
1746  public CompletableFuture<Void> offline(byte[] regionName) {
1747    CompletableFuture<Void> future = new CompletableFuture<>();
1748    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1749      if (err != null) {
1750        future.completeExceptionally(err);
1751        return;
1752      }
1753      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1754        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1755          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1756          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1757        .call(), (ret, err2) -> {
1758          if (err2 != null) {
1759            future.completeExceptionally(err2);
1760          } else {
1761            future.complete(ret);
1762          }
1763        });
1764    });
1765    return future;
1766  }
1767
1768  @Override
1769  public CompletableFuture<Void> move(byte[] regionName) {
1770    CompletableFuture<Void> future = new CompletableFuture<>();
1771    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1772      if (err != null) {
1773        future.completeExceptionally(err);
1774        return;
1775      }
1776      addListener(
1777        moveRegion(regionInfo,
1778          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1779        (ret, err2) -> {
1780          if (err2 != null) {
1781            future.completeExceptionally(err2);
1782          } else {
1783            future.complete(ret);
1784          }
1785        });
1786    });
1787    return future;
1788  }
1789
1790  @Override
1791  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1792    Preconditions.checkNotNull(destServerName,
1793      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1794    CompletableFuture<Void> future = new CompletableFuture<>();
1795    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1796      if (err != null) {
1797        future.completeExceptionally(err);
1798        return;
1799      }
1800      addListener(
1801        moveRegion(regionInfo, RequestConverter
1802          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1803        (ret, err2) -> {
1804          if (err2 != null) {
1805            future.completeExceptionally(err2);
1806          } else {
1807            future.complete(ret);
1808          }
1809        });
1810    });
1811    return future;
1812  }
1813
1814  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1815    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1816      .action(
1817        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1818          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1819      .call();
1820  }
1821
1822  @Override
1823  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1824    return this.<Void> newMasterCaller()
1825      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1826        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1827        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1828      .call();
1829  }
1830
1831  @Override
1832  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1833    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1834    Scan scan = QuotaTableUtil.makeScan(filter);
1835    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1836      new AdvancedScanResultConsumer() {
1837        List<QuotaSettings> settings = new ArrayList<>();
1838
1839        @Override
1840        public void onNext(Result[] results, ScanController controller) {
1841          for (Result result : results) {
1842            try {
1843              QuotaTableUtil.parseResultToCollection(result, settings);
1844            } catch (IOException e) {
1845              controller.terminate();
1846              future.completeExceptionally(e);
1847            }
1848          }
1849        }
1850
1851        @Override
1852        public void onError(Throwable error) {
1853          future.completeExceptionally(error);
1854        }
1855
1856        @Override
1857        public void onComplete() {
1858          future.complete(settings);
1859        }
1860      });
1861    return future;
1862  }
1863
1864  @Override
1865  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1866    boolean enabled) {
1867    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1868      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1869      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1870      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1871  }
1872
1873  @Override
1874  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1875    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1876      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1877      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1878      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1879  }
1880
1881  @Override
1882  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1883    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1884      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1885      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1886      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1887  }
1888
1889  @Override
1890  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1891    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1892      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1893      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1894      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1895  }
1896
1897  @Override
1898  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1899    return this.<ReplicationPeerConfig> newMasterCaller()
1900      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1901        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1902          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1903          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1904          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1905      .call();
1906  }
1907
1908  @Override
1909  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1910    ReplicationPeerConfig peerConfig) {
1911    return this.<UpdateReplicationPeerConfigRequest,
1912      UpdateReplicationPeerConfigResponse> procedureCall(
1913        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1914        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1915        (resp) -> resp.getProcId(),
1916        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1917  }
1918
1919  @Override
1920  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1921    SyncReplicationState clusterState) {
1922    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1923      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1924        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1925          clusterState),
1926        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1927        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1928          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1929  }
1930
1931  @Override
1932  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1933    Map<TableName, List<String>> tableCfs) {
1934    if (tableCfs == null) {
1935      return failedFuture(new ReplicationException("tableCfs is null"));
1936    }
1937
1938    CompletableFuture<Void> future = new CompletableFuture<Void>();
1939    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1940      if (!completeExceptionally(future, error)) {
1941        ReplicationPeerConfig newPeerConfig =
1942          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1943        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1944          if (!completeExceptionally(future, error)) {
1945            future.complete(result);
1946          }
1947        });
1948      }
1949    });
1950    return future;
1951  }
1952
1953  @Override
1954  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1955    Map<TableName, List<String>> tableCfs) {
1956    if (tableCfs == null) {
1957      return failedFuture(new ReplicationException("tableCfs is null"));
1958    }
1959
1960    CompletableFuture<Void> future = new CompletableFuture<Void>();
1961    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1962      if (!completeExceptionally(future, error)) {
1963        ReplicationPeerConfig newPeerConfig = null;
1964        try {
1965          newPeerConfig = ReplicationPeerConfigUtil
1966            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1967        } catch (ReplicationException e) {
1968          future.completeExceptionally(e);
1969          return;
1970        }
1971        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1972          if (!completeExceptionally(future, error)) {
1973            future.complete(result);
1974          }
1975        });
1976      }
1977    });
1978    return future;
1979  }
1980
1981  @Override
1982  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1983    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1984  }
1985
1986  @Override
1987  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1988    Preconditions.checkNotNull(pattern,
1989      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1990    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1991  }
1992
1993  private CompletableFuture<List<ReplicationPeerDescription>>
1994    listReplicationPeers(ListReplicationPeersRequest request) {
1995    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1996      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1997        List<ReplicationPeerDescription>> call(controller, stub, request,
1998          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1999          (resp) -> resp.getPeerDescList().stream()
2000            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
2001            .collect(Collectors.toList())))
2002      .call();
2003  }
2004
2005  @Override
2006  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2007    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2008    addListener(listTableDescriptors(), (tables, error) -> {
2009      if (!completeExceptionally(future, error)) {
2010        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2011        tables.forEach(table -> {
2012          Map<String, Integer> cfs = new HashMap<>();
2013          Stream.of(table.getColumnFamilies())
2014            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2015            .forEach(column -> {
2016              cfs.put(column.getNameAsString(), column.getScope());
2017            });
2018          if (!cfs.isEmpty()) {
2019            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2020          }
2021        });
2022        future.complete(replicatedTableCFs);
2023      }
2024    });
2025    return future;
2026  }
2027
2028  @Override
2029  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2030    SnapshotProtos.SnapshotDescription snapshot =
2031      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2032    try {
2033      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2034    } catch (IllegalArgumentException e) {
2035      return failedFuture(e);
2036    }
2037    CompletableFuture<Void> future = new CompletableFuture<>();
2038    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2039      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2040    addListener(this.<SnapshotResponse> newMasterCaller()
2041      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2042        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2043      .call(), (resp, err) -> {
2044        if (err != null) {
2045          future.completeExceptionally(err);
2046          return;
2047        }
2048        waitSnapshotFinish(snapshotDesc, future, resp);
2049      });
2050    return future;
2051  }
2052
2053  // This is for keeping compatibility with old implementation.
2054  // If there is a procId field in the response, then the snapshot will be operated with a
2055  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2056  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2057    SnapshotResponse resp) {
2058    if (resp.hasProcId()) {
2059      getProcedureResult(resp.getProcId(), future, 0);
2060      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2061    } else {
2062      long expectedTimeout = resp.getExpectedTimeout();
2063      TimerTask pollingTask = new TimerTask() {
2064        int tries = 0;
2065        long startTime = EnvironmentEdgeManager.currentTime();
2066        long endTime = startTime + expectedTimeout;
2067        long maxPauseTime = expectedTimeout / maxAttempts;
2068
2069        @Override
2070        public void run(Timeout timeout) throws Exception {
2071          if (EnvironmentEdgeManager.currentTime() < endTime) {
2072            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2073              if (err2 != null) {
2074                future.completeExceptionally(err2);
2075              } else if (done) {
2076                future.complete(null);
2077              } else {
2078                // retry again after pauseTime.
2079                long pauseTime =
2080                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2081                pauseTime = Math.min(pauseTime, maxPauseTime);
2082                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2083              }
2084            });
2085          } else {
2086            future
2087              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2088                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2089          }
2090        }
2091      };
2092      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2093    }
2094  }
2095
2096  @Override
2097  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2098    return this.<Boolean> newMasterCaller()
2099      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2100        Boolean> call(controller, stub,
2101          IsSnapshotDoneRequest.newBuilder()
2102            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2103          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2104      .call();
2105  }
2106
2107  @Override
2108  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2109    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2110      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2111      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2112    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2113  }
2114
2115  @Override
2116  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2117    boolean restoreAcl) {
2118    CompletableFuture<Void> future = new CompletableFuture<>();
2119    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2120      if (err != null) {
2121        future.completeExceptionally(err);
2122        return;
2123      }
2124      TableName tableName = null;
2125      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2126        for (SnapshotDescription snap : snapshotDescriptions) {
2127          if (snap.getName().equals(snapshotName)) {
2128            tableName = snap.getTableName();
2129            break;
2130          }
2131        }
2132      }
2133      if (tableName == null) {
2134        future.completeExceptionally(
2135          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2136        return;
2137      }
2138      final TableName finalTableName = tableName;
2139      addListener(tableExists(finalTableName), (exists, err2) -> {
2140        if (err2 != null) {
2141          future.completeExceptionally(err2);
2142        } else if (!exists) {
2143          // if table does not exist, then just clone snapshot into new table.
2144          completeConditionalOnFuture(future,
2145            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2146        } else {
2147          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2148            if (err4 != null) {
2149              future.completeExceptionally(err4);
2150            } else if (!disabled) {
2151              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2152            } else {
2153              completeConditionalOnFuture(future,
2154                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2155            }
2156          });
2157        }
2158      });
2159    });
2160    return future;
2161  }
2162
2163  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2164    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2165    if (takeFailSafeSnapshot) {
2166      CompletableFuture<Void> future = new CompletableFuture<>();
2167      // Step.1 Take a snapshot of the current state
2168      String failSafeSnapshotSnapshotNameFormat =
2169        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2170          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2171      final String failSafeSnapshotSnapshotName =
2172        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2173          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2174          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2175      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2176      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2177        if (err != null) {
2178          future.completeExceptionally(err);
2179        } else {
2180          // Step.2 Restore snapshot
2181          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2182            (void2, err2) -> {
2183              if (err2 != null) {
2184                // Step.3.a Something went wrong during the restore and try to rollback.
2185                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2186                  restoreAcl, null), (void3, err3) -> {
2187                    if (err3 != null) {
2188                      future.completeExceptionally(err3);
2189                    } else {
2190                      // If fail to restore snapshot but rollback successfully, delete the
2191                      // restore-failsafe snapshot.
2192                      LOG.info(
2193                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2194                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2195                        if (err4 != null) {
2196                          LOG.error("Unable to remove the failsafe snapshot: {}",
2197                            failSafeSnapshotSnapshotName, err4);
2198                        }
2199                        String msg =
2200                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2201                            + failSafeSnapshotSnapshotName + " succeeded.";
2202                        LOG.error(msg);
2203                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2204                      });
2205                    }
2206                  });
2207              } else {
2208                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2209                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2210                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2211                  if (err3 != null) {
2212                    LOG.error(
2213                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2214                      err3);
2215                  }
2216                  future.complete(ret3);
2217                });
2218              }
2219            });
2220        }
2221      });
2222      return future;
2223    } else {
2224      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2225    }
2226  }
2227
2228  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2229    CompletableFuture<T> parentFuture) {
2230    addListener(parentFuture, (res, err) -> {
2231      if (err != null) {
2232        dependentFuture.completeExceptionally(err);
2233      } else {
2234        dependentFuture.complete(res);
2235      }
2236    });
2237  }
2238
2239  @Override
2240  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2241    boolean restoreAcl, String customSFT) {
2242    CompletableFuture<Void> future = new CompletableFuture<>();
2243    addListener(tableExists(tableName), (exists, err) -> {
2244      if (err != null) {
2245        future.completeExceptionally(err);
2246      } else if (exists) {
2247        future.completeExceptionally(new TableExistsException(tableName));
2248      } else {
2249        completeConditionalOnFuture(future,
2250          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2251      }
2252    });
2253    return future;
2254  }
2255
2256  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2257    boolean restoreAcl, String customSFT) {
2258    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2259      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2260    try {
2261      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2262    } catch (IllegalArgumentException e) {
2263      return failedFuture(e);
2264    }
2265    RestoreSnapshotRequest.Builder builder =
2266      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2267        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2268    if (customSFT != null) {
2269      builder.setCustomSFT(customSFT);
2270    }
2271    return waitProcedureResult(this.<Long> newMasterCaller()
2272      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2273        Long> call(controller, stub, builder.build(),
2274          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2275      .call());
2276  }
2277
2278  @Override
2279  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2280    return getCompletedSnapshots(null);
2281  }
2282
2283  @Override
2284  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2285    Preconditions.checkNotNull(pattern,
2286      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2287    return getCompletedSnapshots(pattern);
2288  }
2289
2290  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2291    return this.<List<SnapshotDescription>> newMasterCaller()
2292      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2293        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2294          GetCompletedSnapshotsRequest.newBuilder().build(),
2295          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2296          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2297      .call();
2298  }
2299
2300  @Override
2301  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2302    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2303      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2304    return getCompletedSnapshots(tableNamePattern, null);
2305  }
2306
2307  @Override
2308  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2309    Pattern snapshotNamePattern) {
2310    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2311      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2312    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2313      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2314    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2315  }
2316
2317  private CompletableFuture<List<SnapshotDescription>>
2318    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2319    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2320    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2321      if (err != null) {
2322        future.completeExceptionally(err);
2323        return;
2324      }
2325      if (tableNames == null || tableNames.size() <= 0) {
2326        future.complete(Collections.emptyList());
2327        return;
2328      }
2329      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2330        if (err2 != null) {
2331          future.completeExceptionally(err2);
2332          return;
2333        }
2334        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2335          future.complete(Collections.emptyList());
2336          return;
2337        }
2338        future.complete(snapshotDescList.stream()
2339          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2340          .collect(Collectors.toList()));
2341      });
2342    });
2343    return future;
2344  }
2345
2346  @Override
2347  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2348    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2349  }
2350
2351  @Override
2352  public CompletableFuture<Void> deleteSnapshots() {
2353    return internalDeleteSnapshots(null, null);
2354  }
2355
2356  @Override
2357  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2358    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2359      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2360    return internalDeleteSnapshots(null, snapshotNamePattern);
2361  }
2362
2363  @Override
2364  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2365    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2366      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2367    return internalDeleteSnapshots(tableNamePattern, null);
2368  }
2369
2370  @Override
2371  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2372    Pattern snapshotNamePattern) {
2373    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2374      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2375    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2376      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2377    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2378  }
2379
2380  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2381    Pattern snapshotNamePattern) {
2382    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2383    if (tableNamePattern == null) {
2384      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2385    } else {
2386      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2387    }
2388    CompletableFuture<Void> future = new CompletableFuture<>();
2389    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2390      if (err != null) {
2391        future.completeExceptionally(err);
2392        return;
2393      }
2394      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2395        future.complete(null);
2396        return;
2397      }
2398      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2399        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2400          if (e != null) {
2401            future.completeExceptionally(e);
2402          } else {
2403            future.complete(v);
2404          }
2405        });
2406    });
2407    return future;
2408  }
2409
2410  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2411    return this.<Void> newMasterCaller()
2412      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2413        controller, stub,
2414        DeleteSnapshotRequest.newBuilder()
2415          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2416        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2417      .call();
2418  }
2419
2420  @Override
2421  public CompletableFuture<Void> execProcedure(String signature, String instance,
2422    Map<String, String> props) {
2423    CompletableFuture<Void> future = new CompletableFuture<>();
2424    ProcedureDescription procDesc =
2425      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2426    addListener(this.<Long> newMasterCaller()
2427      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2428        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2429        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2430      .call(), (expectedTimeout, err) -> {
2431        if (err != null) {
2432          future.completeExceptionally(err);
2433          return;
2434        }
2435        TimerTask pollingTask = new TimerTask() {
2436          int tries = 0;
2437          long startTime = EnvironmentEdgeManager.currentTime();
2438          long endTime = startTime + expectedTimeout;
2439          long maxPauseTime = expectedTimeout / maxAttempts;
2440
2441          @Override
2442          public void run(Timeout timeout) throws Exception {
2443            if (EnvironmentEdgeManager.currentTime() < endTime) {
2444              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2445                if (err2 != null) {
2446                  future.completeExceptionally(err2);
2447                  return;
2448                }
2449                if (done) {
2450                  future.complete(null);
2451                } else {
2452                  // retry again after pauseTime.
2453                  long pauseTime =
2454                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2455                  pauseTime = Math.min(pauseTime, maxPauseTime);
2456                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2457                    TimeUnit.MICROSECONDS);
2458                }
2459              });
2460            } else {
2461              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2462                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2463            }
2464          }
2465        };
2466        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2467        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2468      });
2469    return future;
2470  }
2471
2472  @Override
2473  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2474    Map<String, String> props) {
2475    ProcedureDescription proDesc =
2476      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2477    return this.<byte[]> newMasterCaller()
2478      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2479        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2480        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2481        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2482      .call();
2483  }
2484
2485  @Override
2486  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2487    Map<String, String> props) {
2488    ProcedureDescription proDesc =
2489      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2490    return this.<Boolean> newMasterCaller()
2491      .action(
2492        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2493          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2494          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2495      .call();
2496  }
2497
2498  @Override
2499  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2500    return this.<Boolean> newMasterCaller().action(
2501      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2502        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2503        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2504      .call();
2505  }
2506
2507  @Override
2508  public CompletableFuture<String> getProcedures() {
2509    return this.<String> newMasterCaller()
2510      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2511        controller, stub, GetProceduresRequest.newBuilder().build(),
2512        (s, c, req, done) -> s.getProcedures(c, req, done),
2513        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2514      .call();
2515  }
2516
2517  @Override
2518  public CompletableFuture<String> getLocks() {
2519    return this.<String> newMasterCaller()
2520      .action(
2521        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2522          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2523          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2524      .call();
2525  }
2526
2527  @Override
2528  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2529    boolean offload) {
2530    return this.<Void> newMasterCaller()
2531      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2532        DecommissionRegionServersResponse, Void> call(controller, stub,
2533          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2534          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2535      .call();
2536  }
2537
2538  @Override
2539  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2540    return this.<List<ServerName>> newMasterCaller()
2541      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2542        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2543          ListDecommissionedRegionServersRequest.newBuilder().build(),
2544          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2545          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2546            .collect(Collectors.toList())))
2547      .call();
2548  }
2549
2550  @Override
2551  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2552    List<byte[]> encodedRegionNames) {
2553    return this.<Void> newMasterCaller()
2554      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2555        RecommissionRegionServerResponse, Void> call(controller, stub,
2556          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2557          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2558      .call();
2559  }
2560
2561  /**
2562   * Get the region location for the passed region name. The region name may be a full region name
2563   * or encoded region name. If the region does not found, then it'll throw an
2564   * UnknownRegionException wrapped by a {@link CompletableFuture}
2565   * @param regionNameOrEncodedRegionName region name or encoded region name
2566   * @return region location, wrapped by a {@link CompletableFuture}
2567   */
2568  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2569    if (regionNameOrEncodedRegionName == null) {
2570      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2571    }
2572
2573    CompletableFuture<Optional<HRegionLocation>> future;
2574    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2575      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2576      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2577        // old format encodedName, should be meta region
2578        future = connection.registry.getMetaRegionLocations()
2579          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2580            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2581      } else {
2582        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2583          regionNameOrEncodedRegionName);
2584      }
2585    } else {
2586      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2587      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2588      RegionInfo regionInfo;
2589      try {
2590        regionInfo =
2591          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2592      } catch (IOException ioe) {
2593        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2594      }
2595
2596      if (regionInfo.isMetaRegion()) {
2597        future = connection.registry.getMetaRegionLocations()
2598          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2599            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2600            .findFirst());
2601      } else {
2602        future =
2603          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2604      }
2605    }
2606
2607    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2608    addListener(future, (location, err) -> {
2609      if (err != null) {
2610        returnedFuture.completeExceptionally(err);
2611        return;
2612      }
2613      if (!location.isPresent() || location.get().getRegion() == null) {
2614        returnedFuture.completeExceptionally(
2615          new UnknownRegionException("Invalid region name or encoded region name: "
2616            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2617      } else {
2618        returnedFuture.complete(location.get());
2619      }
2620    });
2621    return returnedFuture;
2622  }
2623
2624  /**
2625   * Get the region info for the passed region name. The region name may be a full region name or
2626   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2627   * wrapped by a {@link CompletableFuture}
2628   * @return region info, wrapped by a {@link CompletableFuture}
2629   */
2630  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2631    if (regionNameOrEncodedRegionName == null) {
2632      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2633    }
2634
2635    if (
2636      Bytes.equals(regionNameOrEncodedRegionName,
2637        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2638        || Bytes.equals(regionNameOrEncodedRegionName,
2639          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2640    ) {
2641      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2642    }
2643
2644    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2645    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2646      if (err != null) {
2647        future.completeExceptionally(err);
2648      } else {
2649        future.complete(location.getRegion());
2650      }
2651    });
2652    return future;
2653  }
2654
2655  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2656    if (numRegions < 3) {
2657      throw new IllegalArgumentException("Must create at least three regions");
2658    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2659      throw new IllegalArgumentException("Start key must be smaller than end key");
2660    }
2661    if (numRegions == 3) {
2662      return new byte[][] { startKey, endKey };
2663    }
2664    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2665    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2666      throw new IllegalArgumentException("Unable to split key range into enough regions");
2667    }
2668    return splitKeys;
2669  }
2670
2671  private void verifySplitKeys(byte[][] splitKeys) {
2672    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2673    // Verify there are no duplicate split keys
2674    byte[] lastKey = null;
2675    for (byte[] splitKey : splitKeys) {
2676      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2677        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2678      }
2679      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2680        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2681          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2682      }
2683      lastKey = splitKey;
2684    }
2685  }
2686
2687  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2688
2689    abstract void onFinished();
2690
2691    abstract void onError(Throwable error);
2692
2693    @Override
2694    public void accept(Void v, Throwable error) {
2695      if (error != null) {
2696        onError(error);
2697        return;
2698      }
2699      onFinished();
2700    }
2701  }
2702
2703  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2704    protected final TableName tableName;
2705
2706    TableProcedureBiConsumer(TableName tableName) {
2707      this.tableName = tableName;
2708    }
2709
2710    abstract String getOperationType();
2711
2712    String getDescription() {
2713      return "Operation: " + getOperationType() + ", " + "Table Name: "
2714        + tableName.getNameWithNamespaceInclAsString();
2715    }
2716
2717    @Override
2718    void onFinished() {
2719      LOG.info(getDescription() + " completed");
2720    }
2721
2722    @Override
2723    void onError(Throwable error) {
2724      LOG.info(getDescription() + " failed with " + error.getMessage());
2725    }
2726  }
2727
2728  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2729    protected final String namespaceName;
2730
2731    NamespaceProcedureBiConsumer(String namespaceName) {
2732      this.namespaceName = namespaceName;
2733    }
2734
2735    abstract String getOperationType();
2736
2737    String getDescription() {
2738      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2739    }
2740
2741    @Override
2742    void onFinished() {
2743      LOG.info(getDescription() + " completed");
2744    }
2745
2746    @Override
2747    void onError(Throwable error) {
2748      LOG.info(getDescription() + " failed with " + error.getMessage());
2749    }
2750  }
2751
2752  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2753
2754    CreateTableProcedureBiConsumer(TableName tableName) {
2755      super(tableName);
2756    }
2757
2758    @Override
2759    String getOperationType() {
2760      return "CREATE";
2761    }
2762  }
2763
2764  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2765
2766    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2767      super(tableName);
2768    }
2769
2770    @Override
2771    String getOperationType() {
2772      return "MODIFY";
2773    }
2774  }
2775
2776  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2777    extends TableProcedureBiConsumer {
2778
2779    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2780      super(tableName);
2781    }
2782
2783    @Override
2784    String getOperationType() {
2785      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2786    }
2787  }
2788
2789  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2790
2791    DeleteTableProcedureBiConsumer(TableName tableName) {
2792      super(tableName);
2793    }
2794
2795    @Override
2796    String getOperationType() {
2797      return "DELETE";
2798    }
2799
2800    @Override
2801    void onFinished() {
2802      connection.getLocator().clearCache(this.tableName);
2803      super.onFinished();
2804    }
2805  }
2806
2807  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2808
2809    TruncateTableProcedureBiConsumer(TableName tableName) {
2810      super(tableName);
2811    }
2812
2813    @Override
2814    String getOperationType() {
2815      return "TRUNCATE";
2816    }
2817  }
2818
2819  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2820
2821    EnableTableProcedureBiConsumer(TableName tableName) {
2822      super(tableName);
2823    }
2824
2825    @Override
2826    String getOperationType() {
2827      return "ENABLE";
2828    }
2829  }
2830
2831  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2832
2833    DisableTableProcedureBiConsumer(TableName tableName) {
2834      super(tableName);
2835    }
2836
2837    @Override
2838    String getOperationType() {
2839      return "DISABLE";
2840    }
2841  }
2842
2843  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2844
2845    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2846      super(tableName);
2847    }
2848
2849    @Override
2850    String getOperationType() {
2851      return "ADD_COLUMN_FAMILY";
2852    }
2853  }
2854
2855  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2856
2857    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2858      super(tableName);
2859    }
2860
2861    @Override
2862    String getOperationType() {
2863      return "DELETE_COLUMN_FAMILY";
2864    }
2865  }
2866
2867  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2868
2869    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2870      super(tableName);
2871    }
2872
2873    @Override
2874    String getOperationType() {
2875      return "MODIFY_COLUMN_FAMILY";
2876    }
2877  }
2878
2879  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2880    extends TableProcedureBiConsumer {
2881
2882    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2883      super(tableName);
2884    }
2885
2886    @Override
2887    String getOperationType() {
2888      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2889    }
2890  }
2891
2892  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2893
2894    FlushTableProcedureBiConsumer(TableName tableName) {
2895      super(tableName);
2896    }
2897
2898    @Override
2899    String getOperationType() {
2900      return "FLUSH";
2901    }
2902  }
2903
2904  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2905
2906    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2907      super(namespaceName);
2908    }
2909
2910    @Override
2911    String getOperationType() {
2912      return "CREATE_NAMESPACE";
2913    }
2914  }
2915
2916  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2917
2918    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2919      super(namespaceName);
2920    }
2921
2922    @Override
2923    String getOperationType() {
2924      return "DELETE_NAMESPACE";
2925    }
2926  }
2927
2928  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2929
2930    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2931      super(namespaceName);
2932    }
2933
2934    @Override
2935    String getOperationType() {
2936      return "MODIFY_NAMESPACE";
2937    }
2938  }
2939
2940  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2941
2942    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2943      super(tableName);
2944    }
2945
2946    @Override
2947    String getOperationType() {
2948      return "MERGE_REGIONS";
2949    }
2950  }
2951
2952  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2953
2954    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2955      super(tableName);
2956    }
2957
2958    @Override
2959    String getOperationType() {
2960      return "SPLIT_REGION";
2961    }
2962  }
2963
2964  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2965
2966    TruncateRegionProcedureBiConsumer(TableName tableName) {
2967      super(tableName);
2968    }
2969
2970    @Override
2971    String getOperationType() {
2972      return "TRUNCATE_REGION";
2973    }
2974  }
2975
2976  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2977    SnapshotProcedureBiConsumer(TableName tableName) {
2978      super(tableName);
2979    }
2980
2981    @Override
2982    String getOperationType() {
2983      return "SNAPSHOT";
2984    }
2985  }
2986
2987  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2988    private final String peerId;
2989    private final Supplier<String> getOperation;
2990
2991    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2992      this.peerId = peerId;
2993      this.getOperation = getOperation;
2994    }
2995
2996    String getDescription() {
2997      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2998    }
2999
3000    @Override
3001    void onFinished() {
3002      LOG.info(getDescription() + " completed");
3003    }
3004
3005    @Override
3006    void onError(Throwable error) {
3007      LOG.info(getDescription() + " failed with " + error.getMessage());
3008    }
3009  }
3010
3011  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
3012    CompletableFuture<Void> future = new CompletableFuture<>();
3013    addListener(procFuture, (procId, error) -> {
3014      if (error != null) {
3015        future.completeExceptionally(error);
3016        return;
3017      }
3018      getProcedureResult(procId, future, 0);
3019    });
3020    return future;
3021  }
3022
3023  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
3024    addListener(
3025      this.<GetProcedureResultResponse> newMasterCaller()
3026        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3027          GetProcedureResultResponse> call(controller, stub,
3028            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3029            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3030        .call(),
3031      (response, error) -> {
3032        if (error != null) {
3033          LOG.warn("failed to get the procedure result procId={}", procId,
3034            ConnectionUtils.translateException(error));
3035          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3036            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3037          return;
3038        }
3039        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3040          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3041            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3042          return;
3043        }
3044        if (response.hasException()) {
3045          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3046          future.completeExceptionally(ioe);
3047        } else {
3048          future.complete(null);
3049        }
3050      });
3051  }
3052
3053  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3054    CompletableFuture<T> future = new CompletableFuture<>();
3055    future.completeExceptionally(error);
3056    return future;
3057  }
3058
3059  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3060    if (error != null) {
3061      future.completeExceptionally(error);
3062      return true;
3063    }
3064    return false;
3065  }
3066
3067  @Override
3068  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3069    return getClusterMetrics(EnumSet.allOf(Option.class));
3070  }
3071
3072  @Override
3073  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3074    return this.<ClusterMetrics> newMasterCaller()
3075      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3076        ClusterMetrics> call(controller, stub,
3077          RequestConverter.buildGetClusterStatusRequest(options),
3078          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3079          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3080      .call();
3081  }
3082
3083  @Override
3084  public CompletableFuture<Void> shutdown() {
3085    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3086      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3087        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3088        resp -> null))
3089      .call();
3090  }
3091
3092  @Override
3093  public CompletableFuture<Void> stopMaster() {
3094    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3095      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3096        controller, stub, StopMasterRequest.newBuilder().build(),
3097        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3098      .call();
3099  }
3100
3101  @Override
3102  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3103    StopServerRequest request = RequestConverter
3104      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3105    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3106      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3107        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3108        resp -> null))
3109      .serverName(serverName).call();
3110  }
3111
3112  @Override
3113  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3114    return this.<Void> newAdminCaller()
3115      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3116        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3117          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3118      .serverName(serverName).call();
3119  }
3120
3121  @Override
3122  public CompletableFuture<Void> updateConfiguration() {
3123    CompletableFuture<Void> future = new CompletableFuture<Void>();
3124    addListener(
3125      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3126      (status, err) -> {
3127        if (err != null) {
3128          future.completeExceptionally(err);
3129        } else {
3130          List<CompletableFuture<Void>> futures = new ArrayList<>();
3131          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3132          futures.add(updateConfiguration(status.getMasterName()));
3133          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3134          addListener(
3135            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3136            (result, err2) -> {
3137              if (err2 != null) {
3138                future.completeExceptionally(err2);
3139              } else {
3140                future.complete(result);
3141              }
3142            });
3143        }
3144      });
3145    return future;
3146  }
3147
3148  @Override
3149  public CompletableFuture<Void> updateConfiguration(String groupName) {
3150    CompletableFuture<Void> future = new CompletableFuture<Void>();
3151    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3152      if (err != null) {
3153        future.completeExceptionally(err);
3154      } else if (rsGroupInfo == null) {
3155        future.completeExceptionally(
3156          new IllegalArgumentException("Group does not exist: " + groupName));
3157      } else {
3158        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3159          if (err2 != null) {
3160            future.completeExceptionally(err2);
3161          } else {
3162            List<CompletableFuture<Void>> futures = new ArrayList<>();
3163            List<ServerName> groupServers = status.getServersName().stream()
3164              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3165            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3166            addListener(
3167              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3168              (result, err3) -> {
3169                if (err3 != null) {
3170                  future.completeExceptionally(err3);
3171                } else {
3172                  future.complete(result);
3173                }
3174              });
3175          }
3176        });
3177      }
3178    });
3179    return future;
3180  }
3181
3182  @Override
3183  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3184    return this.<Void> newAdminCaller()
3185      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3186        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3187          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3188      .serverName(serverName).call();
3189  }
3190
3191  @Override
3192  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3193    return this.<Void> newAdminCaller()
3194      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3195        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3196          RequestConverter.buildClearCompactionQueuesRequest(queues),
3197          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3198      .serverName(serverName).call();
3199  }
3200
3201  @Override
3202  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3203    return this.<List<SecurityCapability>> newMasterCaller()
3204      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3205        List<SecurityCapability>> call(controller, stub,
3206          SecurityCapabilitiesRequest.newBuilder().build(),
3207          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3208          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3209      .call();
3210  }
3211
3212  @Override
3213  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3214    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3215  }
3216
3217  @Override
3218  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3219    TableName tableName) {
3220    Preconditions.checkNotNull(tableName,
3221      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3222    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3223  }
3224
3225  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3226    ServerName serverName) {
3227    return this.<List<RegionMetrics>> newAdminCaller()
3228      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3229        List<RegionMetrics>> adminCall(controller, stub, request,
3230          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3231          RegionMetricsBuilder::toRegionMetrics))
3232      .serverName(serverName).call();
3233  }
3234
3235  @Override
3236  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3237    return this.<Boolean> newMasterCaller()
3238      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3239        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3240          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3241          resp -> resp.getInMaintenanceMode()))
3242      .call();
3243  }
3244
3245  @Override
3246  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3247    CompactType compactType) {
3248    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3249
3250    switch (compactType) {
3251      case MOB:
3252        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3253          if (err != null) {
3254            future.completeExceptionally(err);
3255            return;
3256          }
3257          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3258
3259          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3260            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3261              GetRegionInfoResponse> adminCall(controller, stub,
3262                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3263                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3264            .call(), (resp2, err2) -> {
3265              if (err2 != null) {
3266                future.completeExceptionally(err2);
3267              } else {
3268                if (resp2.hasCompactionState()) {
3269                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3270                } else {
3271                  future.complete(CompactionState.NONE);
3272                }
3273              }
3274            });
3275        });
3276        break;
3277      case NORMAL:
3278        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3279          if (err != null) {
3280            future.completeExceptionally(err);
3281            return;
3282          }
3283          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3284          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3285          locations.stream().filter(loc -> loc.getServerName() != null)
3286            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3287            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3288              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3289                // If any region compaction state is MAJOR_AND_MINOR
3290                // the table compaction state is MAJOR_AND_MINOR, too.
3291                if (err2 != null) {
3292                  future.completeExceptionally(unwrapCompletionException(err2));
3293                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3294                  future.complete(regionState);
3295                } else {
3296                  regionStates.add(regionState);
3297                }
3298              }));
3299            });
3300          addListener(
3301            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3302            (ret, err3) -> {
3303              // If future not completed, check all regions's compaction state
3304              if (!future.isCompletedExceptionally() && !future.isDone()) {
3305                CompactionState state = CompactionState.NONE;
3306                for (CompactionState regionState : regionStates) {
3307                  switch (regionState) {
3308                    case MAJOR:
3309                      if (state == CompactionState.MINOR) {
3310                        future.complete(CompactionState.MAJOR_AND_MINOR);
3311                      } else {
3312                        state = CompactionState.MAJOR;
3313                      }
3314                      break;
3315                    case MINOR:
3316                      if (state == CompactionState.MAJOR) {
3317                        future.complete(CompactionState.MAJOR_AND_MINOR);
3318                      } else {
3319                        state = CompactionState.MINOR;
3320                      }
3321                      break;
3322                    case NONE:
3323                    default:
3324                  }
3325                }
3326                if (!future.isDone()) {
3327                  future.complete(state);
3328                }
3329              }
3330            });
3331        });
3332        break;
3333      default:
3334        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3335    }
3336
3337    return future;
3338  }
3339
3340  @Override
3341  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3342    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3343    addListener(getRegionLocation(regionName), (location, err) -> {
3344      if (err != null) {
3345        future.completeExceptionally(err);
3346        return;
3347      }
3348      ServerName serverName = location.getServerName();
3349      if (serverName == null) {
3350        future
3351          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3352        return;
3353      }
3354      addListener(
3355        this.<GetRegionInfoResponse> newAdminCaller()
3356          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3357            GetRegionInfoResponse> adminCall(controller, stub,
3358              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3359                true),
3360              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3361          .serverName(serverName).call(),
3362        (resp2, err2) -> {
3363          if (err2 != null) {
3364            future.completeExceptionally(err2);
3365          } else {
3366            if (resp2.hasCompactionState()) {
3367              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3368            } else {
3369              future.complete(CompactionState.NONE);
3370            }
3371          }
3372        });
3373    });
3374    return future;
3375  }
3376
3377  @Override
3378  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3379    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3380      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3381    return this.<Optional<Long>> newMasterCaller()
3382      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3383        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3384          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3385          ProtobufUtil::toOptionalTimestamp))
3386      .call();
3387  }
3388
3389  @Override
3390  public CompletableFuture<Optional<Long>>
3391    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3392    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3393    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3394    addListener(getRegionInfo(regionName), (region, err) -> {
3395      if (err != null) {
3396        future.completeExceptionally(err);
3397        return;
3398      }
3399      MajorCompactionTimestampForRegionRequest.Builder builder =
3400        MajorCompactionTimestampForRegionRequest.newBuilder();
3401      builder.setRegion(
3402        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3403      addListener(this.<Optional<Long>> newMasterCaller()
3404        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3405          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3406            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3407            ProtobufUtil::toOptionalTimestamp))
3408        .call(), (timestamp, err2) -> {
3409          if (err2 != null) {
3410            future.completeExceptionally(err2);
3411          } else {
3412            future.complete(timestamp);
3413          }
3414        });
3415    });
3416    return future;
3417  }
3418
3419  @Override
3420  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3421    List<String> serverNamesList) {
3422    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3423    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3424      if (err != null) {
3425        future.completeExceptionally(err);
3426        return;
3427      }
3428      // Accessed by multiple threads.
3429      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3430      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3431      serverNames.stream().forEach(serverName -> {
3432        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3433          if (err2 != null) {
3434            future.completeExceptionally(unwrapCompletionException(err2));
3435          } else {
3436            serverStates.put(serverName, serverState);
3437          }
3438        }));
3439      });
3440      addListener(
3441        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3442        (ret, err3) -> {
3443          if (!future.isCompletedExceptionally()) {
3444            if (err3 != null) {
3445              future.completeExceptionally(err3);
3446            } else {
3447              future.complete(serverStates);
3448            }
3449          }
3450        });
3451    });
3452    return future;
3453  }
3454
3455  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3456    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3457    if (serverNamesList.isEmpty()) {
3458      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3459        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3460      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3461        if (err != null) {
3462          future.completeExceptionally(err);
3463        } else {
3464          future.complete(clusterMetrics.getServersName());
3465        }
3466      });
3467      return future;
3468    } else {
3469      List<ServerName> serverList = new ArrayList<>();
3470      for (String regionServerName : serverNamesList) {
3471        ServerName serverName = null;
3472        try {
3473          serverName = ServerName.valueOf(regionServerName);
3474        } catch (Exception e) {
3475          future.completeExceptionally(
3476            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3477        }
3478        if (serverName == null) {
3479          future.completeExceptionally(
3480            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3481        } else {
3482          serverList.add(serverName);
3483        }
3484      }
3485      future.complete(serverList);
3486    }
3487    return future;
3488  }
3489
3490  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3491    return this.<Boolean> newAdminCaller().serverName(serverName)
3492      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3493        Boolean> adminCall(controller, stub,
3494          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3495          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3496      .call();
3497  }
3498
3499  @Override
3500  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3501    return this.<Boolean> newMasterCaller()
3502      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3503        Boolean> call(controller, stub,
3504          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3505          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3506          (resp) -> resp.getPrevBalanceValue()))
3507      .call();
3508  }
3509
3510  @Override
3511  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3512    return this.<BalanceResponse> newMasterCaller()
3513      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3514        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3515          (s, c, req, done) -> s.balance(c, req, done),
3516          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3517      .call();
3518  }
3519
3520  @Override
3521  public CompletableFuture<Boolean> isBalancerEnabled() {
3522    return this.<Boolean> newMasterCaller()
3523      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3524        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3525          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3526      .call();
3527  }
3528
3529  @Override
3530  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3531    return this.<Boolean> newMasterCaller()
3532      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3533        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3534          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3535          (resp) -> resp.getPrevNormalizerValue()))
3536      .call();
3537  }
3538
3539  @Override
3540  public CompletableFuture<Boolean> isNormalizerEnabled() {
3541    return this.<Boolean> newMasterCaller()
3542      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3543        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3544          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3545      .call();
3546  }
3547
3548  @Override
3549  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3550    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3551  }
3552
3553  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3554    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3555      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3556  }
3557
3558  @Override
3559  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3560    return this.<Boolean> newMasterCaller()
3561      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3562        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3563          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3564          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3565          (resp) -> resp.getPrevValue()))
3566      .call();
3567  }
3568
3569  @Override
3570  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3571    return this.<Boolean> newMasterCaller()
3572      .action(
3573        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3574          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3575            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3576      .call();
3577  }
3578
3579  @Override
3580  public CompletableFuture<Boolean> runCleanerChore() {
3581    return this.<Boolean> newMasterCaller()
3582      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3583        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3584          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3585          (resp) -> resp.getCleanerChoreRan()))
3586      .call();
3587  }
3588
3589  @Override
3590  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3591    return this.<Boolean> newMasterCaller()
3592      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3593        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3594          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3595      .call();
3596  }
3597
3598  @Override
3599  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3600    return this.<Boolean> newMasterCaller()
3601      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3602        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3603          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3604          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3605      .call();
3606  }
3607
3608  @Override
3609  public CompletableFuture<Integer> runCatalogJanitor() {
3610    return this.<Integer> newMasterCaller()
3611      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3612        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3613          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3614      .call();
3615  }
3616
3617  @Override
3618  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3619    ServiceCaller<S, R> callable) {
3620    MasterCoprocessorRpcChannelImpl channel =
3621      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3622    S stub = stubMaker.apply(channel);
3623    CompletableFuture<R> future = new CompletableFuture<>();
3624    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3625    callable.call(stub, controller, resp -> {
3626      if (controller.failed()) {
3627        future.completeExceptionally(controller.getFailed());
3628      } else {
3629        future.complete(resp);
3630      }
3631    });
3632    return future;
3633  }
3634
3635  @Override
3636  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3637    ServiceCaller<S, R> callable, ServerName serverName) {
3638    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3639      this.<Message> newServerCaller().serverName(serverName));
3640    S stub = stubMaker.apply(channel);
3641    CompletableFuture<R> future = new CompletableFuture<>();
3642    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3643    callable.call(stub, controller, resp -> {
3644      if (controller.failed()) {
3645        future.completeExceptionally(controller.getFailed());
3646      } else {
3647        future.complete(resp);
3648      }
3649    });
3650    return future;
3651  }
3652
3653  @Override
3654  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3655    return this.<List<ServerName>> newMasterCaller()
3656      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3657        List<ServerName>> call(controller, stub,
3658          RequestConverter.buildClearDeadServersRequest(servers),
3659          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3660          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3661      .call();
3662  }
3663
3664  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3665    return this.connection.callerFactory.<T> serverRequest()
3666      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3667      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3668      .pause(pauseNs, TimeUnit.NANOSECONDS)
3669      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3670      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3671  }
3672
3673  @Override
3674  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3675    if (tableName == null) {
3676      return failedFuture(new IllegalArgumentException("Table name is null"));
3677    }
3678    CompletableFuture<Void> future = new CompletableFuture<>();
3679    addListener(tableExists(tableName), (exist, err) -> {
3680      if (err != null) {
3681        future.completeExceptionally(err);
3682        return;
3683      }
3684      if (!exist) {
3685        future.completeExceptionally(new TableNotFoundException(
3686          "Table '" + tableName.getNameAsString() + "' does not exists."));
3687        return;
3688      }
3689      addListener(getTableSplits(tableName), (splits, err1) -> {
3690        if (err1 != null) {
3691          future.completeExceptionally(err1);
3692        } else {
3693          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3694            if (err2 != null) {
3695              future.completeExceptionally(err2);
3696            } else {
3697              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3698                if (err3 != null) {
3699                  future.completeExceptionally(err3);
3700                } else {
3701                  future.complete(result3);
3702                }
3703              });
3704            }
3705          });
3706        }
3707      });
3708    });
3709    return future;
3710  }
3711
3712  @Override
3713  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3714    if (tableName == null) {
3715      return failedFuture(new IllegalArgumentException("Table name is null"));
3716    }
3717    CompletableFuture<Void> future = new CompletableFuture<>();
3718    addListener(tableExists(tableName), (exist, err) -> {
3719      if (err != null) {
3720        future.completeExceptionally(err);
3721        return;
3722      }
3723      if (!exist) {
3724        future.completeExceptionally(new TableNotFoundException(
3725          "Table '" + tableName.getNameAsString() + "' does not exists."));
3726        return;
3727      }
3728      addListener(setTableReplication(tableName, false), (result, err2) -> {
3729        if (err2 != null) {
3730          future.completeExceptionally(err2);
3731        } else {
3732          future.complete(result);
3733        }
3734      });
3735    });
3736    return future;
3737  }
3738
3739  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3740    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3741    addListener(
3742      getRegions(tableName).thenApply(regions -> regions.stream()
3743        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3744      (regions, err2) -> {
3745        if (err2 != null) {
3746          future.completeExceptionally(err2);
3747          return;
3748        }
3749        if (regions.size() == 1) {
3750          future.complete(null);
3751        } else {
3752          byte[][] splits = new byte[regions.size() - 1][];
3753          for (int i = 1; i < regions.size(); i++) {
3754            splits[i - 1] = regions.get(i).getStartKey();
3755          }
3756          future.complete(splits);
3757        }
3758      });
3759    return future;
3760  }
3761
3762  /**
3763   * Connect to peer and check the table descriptor on peer:
3764   * <ol>
3765   * <li>Create the same table on peer when not exist.</li>
3766   * <li>Throw an exception if the table already has replication enabled on any of the column
3767   * families.</li>
3768   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3769   * </ol>
3770   * @param tableName name of the table to sync to the peer
3771   * @param splits    table split keys
3772   */
3773  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3774    byte[][] splits) {
3775    CompletableFuture<Void> future = new CompletableFuture<>();
3776    addListener(listReplicationPeers(), (peers, err) -> {
3777      if (err != null) {
3778        future.completeExceptionally(err);
3779        return;
3780      }
3781      if (peers == null || peers.size() <= 0) {
3782        future.completeExceptionally(
3783          new IllegalArgumentException("Found no peer cluster for replication."));
3784        return;
3785      }
3786      List<CompletableFuture<Void>> futures = new ArrayList<>();
3787      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3788        .forEach(peer -> {
3789          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3790        });
3791      addListener(
3792        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3793        (result, err2) -> {
3794          if (err2 != null) {
3795            future.completeExceptionally(err2);
3796          } else {
3797            future.complete(result);
3798          }
3799        });
3800    });
3801    return future;
3802  }
3803
3804  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3805    ReplicationPeerDescription peer) {
3806    Configuration peerConf;
3807    try {
3808      peerConf = ReplicationPeerConfigUtil
3809        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3810    } catch (IOException e) {
3811      return failedFuture(e);
3812    }
3813    URI connectionUri =
3814      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3815    CompletableFuture<Void> future = new CompletableFuture<>();
3816    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3817      if (err != null) {
3818        future.completeExceptionally(err);
3819        return;
3820      }
3821      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3822        if (err1 != null) {
3823          future.completeExceptionally(err1);
3824          return;
3825        }
3826        AsyncAdmin peerAdmin = conn.getAdmin();
3827        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3828          if (err2 != null) {
3829            future.completeExceptionally(err2);
3830            return;
3831          }
3832          if (!exist) {
3833            CompletableFuture<Void> createTableFuture = null;
3834            if (splits == null) {
3835              createTableFuture = peerAdmin.createTable(tableDesc);
3836            } else {
3837              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3838            }
3839            addListener(createTableFuture, (result, err3) -> {
3840              if (err3 != null) {
3841                future.completeExceptionally(err3);
3842              } else {
3843                future.complete(result);
3844              }
3845            });
3846          } else {
3847            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3848              (result, err4) -> {
3849                if (err4 != null) {
3850                  future.completeExceptionally(err4);
3851                } else {
3852                  future.complete(result);
3853                }
3854              });
3855          }
3856        });
3857      });
3858    });
3859    return future;
3860  }
3861
3862  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3863    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3864    CompletableFuture<Void> future = new CompletableFuture<>();
3865    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3866      if (err != null) {
3867        future.completeExceptionally(err);
3868        return;
3869      }
3870      if (peerTableDesc == null) {
3871        future.completeExceptionally(
3872          new IllegalArgumentException("Failed to get table descriptor for table "
3873            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3874        return;
3875      }
3876      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3877        future.completeExceptionally(new IllegalArgumentException(
3878          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3879            + ", but the table descriptors are not same when compared with source cluster."
3880            + " Thus can not enable the table's replication switch."));
3881        return;
3882      }
3883      future.complete(null);
3884    });
3885    return future;
3886  }
3887
3888  /**
3889   * Set the table's replication switch if the table's replication switch is already not set.
3890   * @param tableName name of the table
3891   * @param enableRep is replication switch enable or disable
3892   */
3893  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3894    CompletableFuture<Void> future = new CompletableFuture<>();
3895    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3896      if (err != null) {
3897        future.completeExceptionally(err);
3898        return;
3899      }
3900      if (!tableDesc.matchReplicationScope(enableRep)) {
3901        int scope =
3902          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3903        TableDescriptor newTableDesc =
3904          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3905        addListener(modifyTable(newTableDesc), (result, err2) -> {
3906          if (err2 != null) {
3907            future.completeExceptionally(err2);
3908          } else {
3909            future.complete(result);
3910          }
3911        });
3912      } else {
3913        future.complete(null);
3914      }
3915    });
3916    return future;
3917  }
3918
3919  @Override
3920  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3921    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3922    request.setPeerId(peerId);
3923    return this.<Boolean> newMasterCaller()
3924      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3925        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3926          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3927          resp -> resp.getIsEnabled()))
3928      .call();
3929  }
3930
3931  private void waitUntilAllReplicationPeerModificationProceduresDone(
3932    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
3933    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
3934      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
3935        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
3936          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
3937            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
3938            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
3939            resp -> resp.getProcedureList()))
3940        .call();
3941    addListener(callFuture, (r, e) -> {
3942      if (e != null) {
3943        future.completeExceptionally(e);
3944      } else if (r.isEmpty()) {
3945        // we are done
3946        future.complete(prevOn);
3947      } else {
3948        // retry later to see if the procedures are done
3949        retryTimer.newTimeout(
3950          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
3951          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3952      }
3953    });
3954  }
3955
3956  @Override
3957  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
3958    boolean drainProcedures) {
3959    ReplicationPeerModificationSwitchRequest request =
3960      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
3961    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
3962      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
3963        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
3964          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
3965          resp -> resp.getPreviousValue()))
3966      .call();
3967    // if we do not need to wait all previous peer modification procedure done, or we are enabling
3968    // peer modification, just return here.
3969    if (!drainProcedures || on) {
3970      return callFuture;
3971    }
3972    // otherwise we need to wait until all previous peer modification procedure done
3973    CompletableFuture<Boolean> future = new CompletableFuture<>();
3974    addListener(callFuture, (prevOn, err) -> {
3975      if (err != null) {
3976        future.completeExceptionally(err);
3977        return;
3978      }
3979      // even if the previous state is disabled, we still need to wait here, as there could be
3980      // another client thread which called this method just before us and have already changed the
3981      // state to off, but there are still peer modification procedures not finished, so we should
3982      // also wait here.
3983      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
3984    });
3985    return future;
3986  }
3987
3988  @Override
3989  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
3990    return this.<Boolean> newMasterCaller()
3991      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
3992        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
3993          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
3994          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
3995          (resp) -> resp.getEnabled()))
3996      .call();
3997  }
3998
3999  @Override
4000  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
4001    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
4002    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
4003      if (err != null) {
4004        future.completeExceptionally(err);
4005        return;
4006      }
4007      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4008        locations.stream().filter(l -> l.getRegion() != null)
4009          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4010          .collect(Collectors.groupingBy(l -> l.getServerName(),
4011            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4012      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4013      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4014      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4015        futures
4016          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4017            if (err2 != null) {
4018              future.completeExceptionally(unwrapCompletionException(err2));
4019            } else {
4020              aggregator.append(stats);
4021            }
4022          }));
4023      }
4024      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4025        (ret, err3) -> {
4026          if (err3 != null) {
4027            future.completeExceptionally(unwrapCompletionException(err3));
4028          } else {
4029            future.complete(aggregator.sum());
4030          }
4031        });
4032    });
4033    return future;
4034  }
4035
4036  @Override
4037  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4038    boolean preserveSplits) {
4039    CompletableFuture<Void> future = new CompletableFuture<>();
4040    addListener(tableExists(tableName), (exist, err) -> {
4041      if (err != null) {
4042        future.completeExceptionally(err);
4043        return;
4044      }
4045      if (!exist) {
4046        future.completeExceptionally(new TableNotFoundException(tableName));
4047        return;
4048      }
4049      addListener(tableExists(newTableName), (exist1, err1) -> {
4050        if (err1 != null) {
4051          future.completeExceptionally(err1);
4052          return;
4053        }
4054        if (exist1) {
4055          future.completeExceptionally(new TableExistsException(newTableName));
4056          return;
4057        }
4058        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4059          if (err2 != null) {
4060            future.completeExceptionally(err2);
4061            return;
4062          }
4063          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4064          if (preserveSplits) {
4065            addListener(getTableSplits(tableName), (splits, err3) -> {
4066              if (err3 != null) {
4067                future.completeExceptionally(err3);
4068              } else {
4069                addListener(
4070                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4071                  (result, err4) -> {
4072                    if (err4 != null) {
4073                      future.completeExceptionally(err4);
4074                    } else {
4075                      future.complete(result);
4076                    }
4077                  });
4078              }
4079            });
4080          } else {
4081            addListener(createTable(newTableDesc), (result, err5) -> {
4082              if (err5 != null) {
4083                future.completeExceptionally(err5);
4084              } else {
4085                future.complete(result);
4086              }
4087            });
4088          }
4089        });
4090      });
4091    });
4092    return future;
4093  }
4094
4095  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4096    List<RegionInfo> hris) {
4097    return this.<CacheEvictionStats> newAdminCaller()
4098      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4099        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4100          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4101          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4102          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4103      .serverName(serverName).call();
4104  }
4105
4106  @Override
4107  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4108    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4109      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4110        Boolean> call(controller, stub,
4111          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4112          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4113          resp -> resp.getPreviousRpcThrottleEnabled()))
4114      .call();
4115    return future;
4116  }
4117
4118  @Override
4119  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4120    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4121      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4122        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4123          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4124          resp -> resp.getRpcThrottleEnabled()))
4125      .call();
4126    return future;
4127  }
4128
4129  @Override
4130  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4131    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4132      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4133        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4134          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4135            .build(),
4136          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4137          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4138      .call();
4139    return future;
4140  }
4141
4142  @Override
4143  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4144    return this.<Map<TableName, Long>> newMasterCaller()
4145      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4146        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4147          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4148          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4149          resp -> resp.getSizesList().stream().collect(Collectors
4150            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4151      .call();
4152  }
4153
4154  @Override
4155  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4156    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4157    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4158      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4159        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4160          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4161          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4162          resp -> resp.getSnapshotsList().stream()
4163            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4164              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4165      .serverName(serverName).call();
4166  }
4167
4168  private CompletableFuture<SpaceQuotaSnapshot>
4169    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4170    return this.<SpaceQuotaSnapshot> newMasterCaller()
4171      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4172        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4173          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4174      .call();
4175  }
4176
4177  @Override
4178  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4179    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4180      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4181      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4182  }
4183
4184  @Override
4185  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4186    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4187    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4188      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4189      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4190  }
4191
4192  @Override
4193  public CompletableFuture<Void> grant(UserPermission userPermission,
4194    boolean mergeExistingPermissions) {
4195    return this.<Void> newMasterCaller()
4196      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4197        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4198        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4199      .call();
4200  }
4201
4202  @Override
4203  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4204    return this.<Void> newMasterCaller()
4205      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4206        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4207        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4208      .call();
4209  }
4210
4211  @Override
4212  public CompletableFuture<List<UserPermission>>
4213    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4214    return this.<List<UserPermission>> newMasterCaller()
4215      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4216        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4217          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4218          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4219          resp -> resp.getUserPermissionList().stream()
4220            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4221            .collect(Collectors.toList())))
4222      .call();
4223  }
4224
4225  @Override
4226  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4227    List<Permission> permissions) {
4228    return this.<List<Boolean>> newMasterCaller()
4229      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4230        List<Boolean>> call(controller, stub,
4231          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4232          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4233          resp -> resp.getHasUserPermissionList()))
4234      .call();
4235  }
4236
4237  @Override
4238  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4239    return this.<Boolean> newMasterCaller()
4240      .action((controller, stub) -> this.call(controller, stub,
4241        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4242        MasterService.Interface::switchSnapshotCleanup,
4243        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4244      .call();
4245  }
4246
4247  @Override
4248  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4249    return this.<Boolean> newMasterCaller()
4250      .action((controller, stub) -> this.call(controller, stub,
4251        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4252        MasterService.Interface::isSnapshotCleanupEnabled,
4253        IsSnapshotCleanupEnabledResponse::getEnabled))
4254      .call();
4255  }
4256
4257  @Override
4258  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4259    return this.<Void> newMasterCaller()
4260      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4261        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4262        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4263      .call();
4264  }
4265
4266  @Override
4267  public CompletableFuture<Void> addRSGroup(String groupName) {
4268    return this.<Void> newMasterCaller()
4269      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4270        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4271        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4272      .call();
4273  }
4274
4275  @Override
4276  public CompletableFuture<Void> removeRSGroup(String groupName) {
4277    return this.<Void> newMasterCaller()
4278      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4279        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4280        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4281      .call();
4282  }
4283
4284  @Override
4285  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4286    BalanceRequest request) {
4287    return this.<BalanceResponse> newMasterCaller()
4288      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4289        BalanceResponse> call(controller, stub,
4290          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4291          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4292      .call();
4293  }
4294
4295  @Override
4296  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4297    return this.<List<RSGroupInfo>> newMasterCaller()
4298      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4299        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4300          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4301            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4302      .call();
4303  }
4304
4305  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4306    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4307    final String logType) {
4308    if (CollectionUtils.isEmpty(serverNames)) {
4309      return CompletableFuture.completedFuture(Collections.emptyList());
4310    }
4311    return CompletableFuture.supplyAsync(() -> serverNames
4312      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4313        filterParams, limit, logType))
4314      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4315  }
4316
4317  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4318    Map<String, Object> filterParams, int limit, String logType) {
4319    return this.<List<LogEntry>> newAdminCaller()
4320      .action((controller, stub) -> this.adminCall(controller, stub,
4321        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4322        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4323      .serverName(serverName).call();
4324  }
4325
4326  @Override
4327  public CompletableFuture<List<Boolean>>
4328    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4329    if (CollectionUtils.isEmpty(serverNames)) {
4330      return CompletableFuture.completedFuture(Collections.emptyList());
4331    }
4332    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4333      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4334    return convertToFutureOfList(clearSlowLogResponseList);
4335  }
4336
4337  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4338    return this.<Boolean> newAdminCaller()
4339      .action((controller, stub) -> this.adminCall(controller, stub,
4340        RequestConverter.buildClearSlowLogResponseRequest(),
4341        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4342      .serverName(serverName).call();
4343  }
4344
4345  private static <T> CompletableFuture<List<T>>
4346    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4347    CompletableFuture<Void> allDoneFuture =
4348      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4349    return allDoneFuture
4350      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4351  }
4352
4353  @Override
4354  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4355    return this.<List<TableName>> newMasterCaller()
4356      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4357        List<TableName>> call(controller, stub,
4358          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4359          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4360            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4361      .call();
4362  }
4363
4364  @Override
4365  public CompletableFuture<Pair<List<String>, List<TableName>>>
4366    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4367    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4368      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4369        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4370        Pair<List<String>, List<TableName>>> call(controller, stub,
4371          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4372            .build(),
4373          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4374          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4375            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4376      .call();
4377  }
4378
4379  @Override
4380  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4381    return this.<RSGroupInfo> newMasterCaller()
4382      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4383        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4384          GetRSGroupInfoOfServerRequest.newBuilder()
4385            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4386              .setPort(hostPort.getPort()).build())
4387            .build(),
4388          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4389          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4390      .call();
4391  }
4392
4393  @Override
4394  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4395    return this.<Void> newMasterCaller()
4396      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4397        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4398        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4399      .call();
4400  }
4401
4402  @Override
4403  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4404    CompletableFuture<Void> future = new CompletableFuture<>();
4405    for (TableName tableName : tables) {
4406      addListener(tableExists(tableName), (exist, err) -> {
4407        if (err != null) {
4408          future.completeExceptionally(err);
4409          return;
4410        }
4411        if (!exist) {
4412          future.completeExceptionally(new TableNotFoundException(tableName));
4413          return;
4414        }
4415      });
4416    }
4417    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4418      if (err != null) {
4419        future.completeExceptionally(err);
4420        return;
4421      }
4422      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4423        future.complete(null);
4424        return;
4425      }
4426      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4427      for (TableDescriptor td : tableDescriptions) {
4428        newTableDescriptors
4429          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4430      }
4431      addListener(
4432        CompletableFuture.allOf(
4433          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4434        (v, e) -> {
4435          if (e != null) {
4436            future.completeExceptionally(e);
4437          } else {
4438            future.complete(v);
4439          }
4440        });
4441    });
4442    return future;
4443  }
4444
4445  @Override
4446  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4447    return this.<RSGroupInfo> newMasterCaller()
4448      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4449        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4450          GetRSGroupInfoOfTableRequest.newBuilder()
4451            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4452          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4453          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4454      .call();
4455  }
4456
4457  @Override
4458  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4459    return this.<RSGroupInfo> newMasterCaller()
4460      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4461        RSGroupInfo> call(controller, stub,
4462          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4463          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4464          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4465      .call();
4466  }
4467
4468  @Override
4469  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4470    return this.<Void> newMasterCaller()
4471      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4472        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4473          .setNewRsgroupName(newName).build(),
4474        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4475      .call();
4476  }
4477
4478  @Override
4479  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4480    Map<String, String> configuration) {
4481    UpdateRSGroupConfigRequest.Builder request =
4482      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4483    if (configuration != null) {
4484      configuration.entrySet().forEach(e -> request.addConfiguration(
4485        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4486    }
4487    return this.<Void> newMasterCaller()
4488      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4489        Void> call(controller, stub, request.build(),
4490          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4491      .call();
4492  }
4493
4494  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4495    return this.<List<LogEntry>> newMasterCaller()
4496      .action((controller, stub) -> this.call(controller, stub,
4497        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4498        ProtobufUtil::toBalancerDecisionResponse))
4499      .call();
4500  }
4501
4502  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4503    return this.<List<LogEntry>> newMasterCaller()
4504      .action((controller, stub) -> this.call(controller, stub,
4505        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4506        ProtobufUtil::toBalancerRejectionResponse))
4507      .call();
4508  }
4509
4510  @Override
4511  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4512    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4513    if (logType == null || serverType == null) {
4514      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4515    }
4516    switch (logType) {
4517      case "SLOW_LOG":
4518      case "LARGE_LOG":
4519        if (ServerType.MASTER.equals(serverType)) {
4520          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4521        }
4522        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4523      case "BALANCER_DECISION":
4524        if (ServerType.REGION_SERVER.equals(serverType)) {
4525          throw new IllegalArgumentException(
4526            "Balancer Decision logs are not maintained by HRegionServer");
4527        }
4528        return getBalancerDecisions(limit);
4529      case "BALANCER_REJECTION":
4530        if (ServerType.REGION_SERVER.equals(serverType)) {
4531          throw new IllegalArgumentException(
4532            "Balancer Rejection logs are not maintained by HRegionServer");
4533        }
4534        return getBalancerRejections(limit);
4535      default:
4536        return CompletableFuture.completedFuture(Collections.emptyList());
4537    }
4538  }
4539
4540  @Override
4541  public CompletableFuture<Void> flushMasterStore() {
4542    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4543    return this.<Void> newMasterCaller()
4544      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4545        Void> call(controller, stub, request.build(),
4546          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4547      .call();
4548  }
4549
4550  @Override
4551  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4552    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4553    return this.<List<String>> newAdminCaller()
4554      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4555        List<String>> adminCall(controller, stub, request.build(),
4556          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4557          resp -> resp.getCachedFilesList()))
4558      .serverName(serverName).call();
4559  }
4560}