001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.replication.SyncReplicationState;
090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
092import org.apache.hadoop.hbase.security.access.Permission;
093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
094import org.apache.hadoop.hbase.security.access.UserPermission;
095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.Strings;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
109import org.apache.hbase.thirdparty.com.google.protobuf.Message;
110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
113import org.apache.hbase.thirdparty.io.netty.util.Timeout;
114import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
116
117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshHFilesRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshHFilesResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshMetaRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RefreshMetaResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
361import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
362import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
363import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
364import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
365import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
366import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
367import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
368
369/**
370 * The implementation of AsyncAdmin.
371 * <p>
372 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
373 * be finished inside the rpc framework thread, which means that the callbacks registered to the
374 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
375 * this class should not try to do time consuming tasks in the callbacks.
376 * @since 2.0.0
377 * @see AsyncHBaseAdmin
378 * @see AsyncConnection#getAdmin()
379 * @see AsyncConnection#getAdminBuilder()
380 */
381@InterfaceAudience.Private
382class RawAsyncHBaseAdmin implements AsyncAdmin {
383
384  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
385
386  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncHBaseAdmin.class);
387
388  private final AsyncConnectionImpl connection;
389
390  private final HashedWheelTimer retryTimer;
391
392  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
393
394  private final long rpcTimeoutNs;
395
396  private final long operationTimeoutNs;
397
398  private final long pauseNs;
399
400  private final long pauseNsForServerOverloaded;
401
402  private final int maxAttempts;
403
404  private final int startLogErrorsCnt;
405
406  private final NonceGenerator ng;
407
408  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
409    AsyncAdminBuilderBase builder) {
410    this.connection = connection;
411    this.retryTimer = retryTimer;
412    this.metaTable = connection.getTable(META_TABLE_NAME);
413    this.rpcTimeoutNs = builder.rpcTimeoutNs;
414    this.operationTimeoutNs = builder.operationTimeoutNs;
415    this.pauseNs = builder.pauseNs;
416    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
417      LOG.warn(
418        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
419          + " the normal pause value {} ms, use the greater one instead",
420        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
421        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
422      this.pauseNsForServerOverloaded = builder.pauseNs;
423    } else {
424      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
425    }
426    this.maxAttempts = builder.maxAttempts;
427    this.startLogErrorsCnt = builder.startLogErrorsCnt;
428    this.ng = connection.getNonceGenerator();
429  }
430
431  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
432    return this.connection.callerFactory.<T> masterRequest()
433      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
434      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
435      .pause(pauseNs, TimeUnit.NANOSECONDS)
436      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
437      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
438  }
439
440  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
441    return this.connection.callerFactory.<T> adminRequest()
442      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
443      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
444      .pause(pauseNs, TimeUnit.NANOSECONDS)
445      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
446      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
447  }
448
449  @FunctionalInterface
450  private interface MasterRpcCall<RESP, REQ> {
451    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
452      RpcCallback<RESP> done);
453  }
454
455  @FunctionalInterface
456  private interface AdminRpcCall<RESP, REQ> {
457    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
458      RpcCallback<RESP> done);
459  }
460
461  @FunctionalInterface
462  private interface Converter<D, S> {
463    D convert(S src) throws IOException;
464  }
465
466  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
467    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
468    Converter<RESP, PRESP> respConverter) {
469    CompletableFuture<RESP> future = new CompletableFuture<>();
470    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
471
472      @Override
473      public void run(PRESP resp) {
474        if (controller.failed()) {
475          future.completeExceptionally(controller.getFailed());
476        } else {
477          try {
478            future.complete(respConverter.convert(resp));
479          } catch (IOException e) {
480            future.completeExceptionally(e);
481          }
482        }
483      }
484    });
485    return future;
486  }
487
488  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
489    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
490    Converter<RESP, PRESP> respConverter) {
491    CompletableFuture<RESP> future = new CompletableFuture<>();
492    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
493
494      @Override
495      public void run(PRESP resp) {
496        if (controller.failed()) {
497          future.completeExceptionally(controller.getFailed());
498        } else {
499          try {
500            future.complete(respConverter.convert(resp));
501          } catch (IOException e) {
502            future.completeExceptionally(e);
503          }
504        }
505      }
506    });
507    return future;
508  }
509
510  /**
511   * short-circuit call for
512   * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)}
513   * by ignoring procedure result
514   */
515  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
516    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
517    ProcedureBiConsumer<Void> consumer) {
518    return procedureCall(preq, rpcCall, respConverter, result -> null, consumer);
519  }
520
521  /**
522   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
523   * ProcedureBiConsumer) by skip setting priority for request
524   */
525  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq,
526    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
527    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
528    return procedureCall(b -> {
529    }, preq, rpcCall, respConverter, resultConverter, consumer);
530  }
531
532  /**
533   * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter,
534   * ProcedureBiConsumer) by ignoring procedure result
535   */
536  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
537    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
538    ProcedureBiConsumer<Void> consumer) {
539    return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer);
540  }
541
542  /**
543   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
544   * ProcedureBiConsumer) by skip setting priority for request
545   */
546  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq,
547    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
548    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
549    return procedureCall(b -> b.tableName(tableName), preq, rpcCall, respConverter, resultConverter,
550      consumer);
551  }
552
553  /**
554   * @param <PREQ>          type of request
555   * @param <PRESP>         type of response
556   * @param <PRES>          type of procedure call result
557   * @param prioritySetter  prioritySetter set priority by table for request
558   * @param preq            procedure call request
559   * @param rpcCall         procedure rpc call
560   * @param respConverter   extract proc id from procedure call response
561   * @param resultConverter extract result from procedure call result
562   * @param consumer        action performs on result
563   * @return procedure call result, null if procedure is void
564   */
565  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(
566    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
567    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
568    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
569    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller()
570      .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter));
571    prioritySetter.accept(builder);
572    CompletableFuture<Long> procFuture = builder.call();
573    CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter);
574    addListener(future, consumer);
575    return future;
576  }
577
578  @Override
579  public CompletableFuture<Boolean> tableExists(TableName tableName) {
580    if (TableName.isMetaTableName(tableName)) {
581      return CompletableFuture.completedFuture(true);
582    }
583    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
584  }
585
586  @Override
587  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
588    return getTableDescriptors(
589      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
590  }
591
592  /**
593   * {@link #listTableDescriptors(boolean)}
594   */
595  @Override
596  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
597    boolean includeSysTables) {
598    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
599      + "use listTableDescriptors(boolean) instead");
600    return getTableDescriptors(
601      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
602  }
603
604  @Override
605  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
606    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
607      + "use listTableDescriptors(boolean) instead");
608    if (tableNames.isEmpty()) {
609      return CompletableFuture.completedFuture(Collections.emptyList());
610    }
611    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
612  }
613
614  private CompletableFuture<List<TableDescriptor>>
615    getTableDescriptors(GetTableDescriptorsRequest request) {
616    return this.<List<TableDescriptor>> newMasterCaller()
617      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
618        List<TableDescriptor>> call(controller, stub, request,
619          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
620          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
621      .call();
622  }
623
624  @Override
625  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
626    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
627  }
628
629  @Override
630  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
631    boolean includeSysTables) {
632    Preconditions.checkNotNull(pattern,
633      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
634    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
635  }
636
637  @Override
638  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
639    return this.<List<TableName>> newMasterCaller()
640      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
641        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
642          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
643          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
644          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
645      .call();
646  }
647
648  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
649    return this.<List<TableName>> newMasterCaller()
650      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
651        List<TableName>> call(controller, stub, request,
652          (s, c, req, done) -> s.getTableNames(c, req, done),
653          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
654      .call();
655  }
656
657  @Override
658  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
659    return this.<List<TableDescriptor>> newMasterCaller()
660      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
661        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
662          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
663          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
664          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
665      .call();
666  }
667
668  @Override
669  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
670    return this.<List<TableDescriptor>> newMasterCaller()
671      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
672        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
673          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
674          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
675          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
676      .call();
677  }
678
679  @Override
680  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
681    return this.<List<TableName>> newMasterCaller()
682      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
683        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
684          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
685          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
686          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
687      .call();
688  }
689
690  @Override
691  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
692    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
693    addListener(this.<List<TableSchema>> newMasterCaller().tableName(tableName)
694      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
695        List<TableSchema>> call(controller, stub,
696          RequestConverter.buildGetTableDescriptorsRequest(tableName),
697          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
698          (resp) -> resp.getTableSchemaList()))
699      .call(), (tableSchemas, error) -> {
700        if (error != null) {
701          future.completeExceptionally(error);
702          return;
703        }
704        if (!tableSchemas.isEmpty()) {
705          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
706        } else {
707          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
708        }
709      });
710    return future;
711  }
712
713  @Override
714  public CompletableFuture<Void> createTable(TableDescriptor desc) {
715    return createTable(desc.getTableName(),
716      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
717  }
718
719  @Override
720  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
721    int numRegions) {
722    try {
723      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
724    } catch (IllegalArgumentException e) {
725      return failedFuture(e);
726    }
727  }
728
729  @Override
730  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
731    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
732      + " use createTable(TableDescriptor) instead");
733    try {
734      verifySplitKeys(splitKeys);
735      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
736        splitKeys, ng.getNonceGroup(), ng.newNonce()));
737    } catch (IllegalArgumentException e) {
738      return failedFuture(e);
739    }
740  }
741
742  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
743    Preconditions.checkNotNull(tableName, "table name is null");
744    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
745      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
746      new CreateTableProcedureBiConsumer(tableName));
747  }
748
749  @Override
750  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
751    return modifyTable(desc, true);
752  }
753
754  @Override
755  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
756    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
757      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
758        ng.newNonce(), reopenRegions),
759      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
760      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
761  }
762
763  @Override
764  public CompletableFuture<Void> reopenTableRegions(TableName tableName) {
765    return reopenTableRegions(tableName, Collections.emptyList());
766  }
767
768  @Override
769  public CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions) {
770    List<byte[]> regionNames = regions.stream().map(RegionInfo::getRegionName).toList();
771    return this.<ReopenTableRegionsRequest, ReopenTableRegionsResponse> procedureCall(tableName,
772      RequestConverter.buildReopenTableRegionsRequest(tableName, regionNames, ng.getNonceGroup(),
773        ng.newNonce()),
774      (s, c, req, done) -> s.reopenTableRegions(c, req, done), (resp) -> resp.getProcId(),
775      new ReopenTableRegionsProcedureBiConsumer(this, tableName));
776  }
777
778  @Override
779  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
780    return this.<ModifyTableStoreFileTrackerRequest,
781      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
782        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
783          ng.getNonceGroup(), ng.newNonce()),
784        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
785        (resp) -> resp.getProcId(),
786        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
787  }
788
789  @Override
790  public CompletableFuture<Void> deleteTable(TableName tableName) {
791    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
792      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
793      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
794      new DeleteTableProcedureBiConsumer(tableName));
795  }
796
797  @Override
798  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
799    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
800      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
801        ng.newNonce()),
802      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
803      new TruncateTableProcedureBiConsumer(tableName));
804  }
805
806  @Override
807  public CompletableFuture<Void> enableTable(TableName tableName) {
808    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
809      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
810      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
811      new EnableTableProcedureBiConsumer(tableName));
812  }
813
814  @Override
815  public CompletableFuture<Void> disableTable(TableName tableName) {
816    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
817      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
818      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
819      new DisableTableProcedureBiConsumer(tableName));
820  }
821
822  /**
823   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
824   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
825   * targetState).
826   */
827  private static CompletableFuture<Boolean> completeCheckTableState(
828    CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error,
829    TableState.State targetState, TableName tableName) {
830    if (error != null) {
831      future.completeExceptionally(error);
832    } else {
833      if (tableState.isPresent()) {
834        future.complete(tableState.get().inStates(targetState));
835      } else {
836        future.completeExceptionally(new TableNotFoundException(tableName));
837      }
838    }
839    return future;
840  }
841
842  @Override
843  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
844    if (TableName.isMetaTableName(tableName)) {
845      return CompletableFuture.completedFuture(true);
846    }
847    CompletableFuture<Boolean> future = new CompletableFuture<>();
848    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
849      (tableState, error) -> {
850        completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName);
851      });
852    return future;
853  }
854
855  @Override
856  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
857    if (TableName.isMetaTableName(tableName)) {
858      return CompletableFuture.completedFuture(false);
859    }
860    CompletableFuture<Boolean> future = new CompletableFuture<>();
861    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
862      (tableState, error) -> {
863        completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName);
864      });
865    return future;
866  }
867
868  @Override
869  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
870    if (TableName.isMetaTableName(tableName)) {
871      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
872        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
873    }
874    CompletableFuture<Boolean> future = new CompletableFuture<>();
875    addListener(isTableEnabled(tableName), (enabled, error) -> {
876      if (error != null) {
877        if (error instanceof TableNotFoundException) {
878          future.complete(false);
879        } else {
880          future.completeExceptionally(error);
881        }
882        return;
883      }
884      if (!enabled) {
885        future.complete(false);
886      } else {
887        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
888          (locations, error1) -> {
889            if (error1 != null) {
890              future.completeExceptionally(error1);
891              return;
892            }
893            List<HRegionLocation> notDeployedRegions = locations.stream()
894              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
895            if (notDeployedRegions.size() > 0) {
896              if (LOG.isDebugEnabled()) {
897                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
898              }
899              future.complete(false);
900              return;
901            }
902            future.complete(true);
903          });
904      }
905    });
906    return future;
907  }
908
909  @Override
910  public CompletableFuture<Void> addColumnFamily(TableName tableName,
911    ColumnFamilyDescriptor columnFamily) {
912    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
913      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
914        ng.newNonce()),
915      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
916      new AddColumnFamilyProcedureBiConsumer(tableName));
917  }
918
919  @Override
920  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
921    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
922      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
923        ng.newNonce()),
924      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
925      new DeleteColumnFamilyProcedureBiConsumer(tableName));
926  }
927
928  @Override
929  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
930    ColumnFamilyDescriptor columnFamily) {
931    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
932      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
933        ng.newNonce()),
934      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
935      new ModifyColumnFamilyProcedureBiConsumer(tableName));
936  }
937
938  @Override
939  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
940    byte[] family, String dstSFT) {
941    return this.<ModifyColumnStoreFileTrackerRequest,
942      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
943        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
944          ng.getNonceGroup(), ng.newNonce()),
945        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
946        (resp) -> resp.getProcId(),
947        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
948  }
949
950  @Override
951  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
952    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
953      RequestConverter.buildCreateNamespaceRequest(descriptor),
954      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
955      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
956  }
957
958  @Override
959  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
960    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
961      RequestConverter.buildModifyNamespaceRequest(descriptor),
962      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
963      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
964  }
965
966  @Override
967  public CompletableFuture<Void> deleteNamespace(String name) {
968    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
969      RequestConverter.buildDeleteNamespaceRequest(name),
970      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
971      new DeleteNamespaceProcedureBiConsumer(name));
972  }
973
974  @Override
975  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
976    return this.<NamespaceDescriptor> newMasterCaller()
977      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
978        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
979          RequestConverter.buildGetNamespaceDescriptorRequest(name),
980          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
981          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
982      .call();
983  }
984
985  @Override
986  public CompletableFuture<List<String>> listNamespaces() {
987    return this.<List<String>> newMasterCaller()
988      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
989        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
990          (s, c, req, done) -> s.listNamespaces(c, req, done),
991          (resp) -> resp.getNamespaceNameList()))
992      .call();
993  }
994
995  @Override
996  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
997    return this.<List<NamespaceDescriptor>> newMasterCaller()
998      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
999        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
1000          ListNamespaceDescriptorsRequest.newBuilder().build(),
1001          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
1002          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
1003      .call();
1004  }
1005
1006  @Override
1007  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
1008    return this.<List<RegionInfo>> newAdminCaller()
1009      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
1010        List<RegionInfo>> adminCall(controller, stub,
1011          RequestConverter.buildGetOnlineRegionRequest(),
1012          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
1013          resp -> ProtobufUtil.getRegionInfos(resp)))
1014      .serverName(serverName).call();
1015  }
1016
1017  @Override
1018  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
1019    if (tableName.equals(META_TABLE_NAME)) {
1020      return connection.registry.getMetaRegionLocations()
1021        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
1022          .collect(Collectors.toList()));
1023    } else {
1024      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
1025        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
1026    }
1027  }
1028
1029  @Override
1030  public CompletableFuture<Void> flush(TableName tableName) {
1031    return flush(tableName, Collections.emptyList());
1032  }
1033
1034  @Override
1035  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
1036    Preconditions.checkNotNull(columnFamily,
1037      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1038    return flush(tableName, Collections.singletonList(columnFamily));
1039  }
1040
1041  @Override
1042  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
1043    // This is for keeping compatibility with old implementation.
1044    // If the server version is lower than the client version, it's possible that the
1045    // flushTable method is not present in the server side, if so, we need to fall back
1046    // to the old implementation.
1047    Preconditions.checkNotNull(columnFamilyList,
1048      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1049    List<byte[]> columnFamilies = columnFamilyList.stream()
1050      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
1051    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
1052      ng.getNonceGroup(), ng.newNonce());
1053    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
1054      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
1055      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
1056    CompletableFuture<Void> future = new CompletableFuture<>();
1057    addListener(procFuture, (ret, error) -> {
1058      if (error != null) {
1059        if (
1060          error instanceof TableNotFoundException || error instanceof TableNotEnabledException
1061            || error instanceof NoSuchColumnFamilyException
1062        ) {
1063          future.completeExceptionally(error);
1064        } else if (error instanceof DoNotRetryIOException) {
1065          // usually this is caused by the method is not present on the server or
1066          // the hbase hadoop version does not match the running hadoop version.
1067          // if that happens, we need fall back to the old flush implementation.
1068          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
1069          legacyFlush(future, tableName, columnFamilies);
1070        } else {
1071          future.completeExceptionally(error);
1072        }
1073      } else {
1074        future.complete(ret);
1075      }
1076    });
1077    return future;
1078  }
1079
1080  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1081    List<byte[]> columnFamilies) {
1082    addListener(tableExists(tableName), (exists, err) -> {
1083      if (err != null) {
1084        future.completeExceptionally(err);
1085      } else if (!exists) {
1086        future.completeExceptionally(new TableNotFoundException(tableName));
1087      } else {
1088        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1089          if (err2 != null) {
1090            future.completeExceptionally(err2);
1091          } else if (!tableEnabled) {
1092            future.completeExceptionally(new TableNotEnabledException(tableName));
1093          } else {
1094            Map<String, String> props = new HashMap<>();
1095            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1096              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1097                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1098            }
1099            addListener(
1100              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1101              (ret, err3) -> {
1102                if (err3 != null) {
1103                  future.completeExceptionally(err3);
1104                } else {
1105                  future.complete(ret);
1106                }
1107              });
1108          }
1109        });
1110      }
1111    });
1112  }
1113
1114  @Override
1115  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1116    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1117    });
1118  }
1119
1120  @Override
1121  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1122    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1123      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1124    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1125    });
1126  }
1127
1128  /**
1129   * This method is for internal use only, where we need the response of the flush.
1130   * <p/>
1131   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1132   * API.
1133   */
1134  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1135    boolean writeFlushWALMarker) {
1136    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1137    addListener(getRegionLocation(regionName), (location, err) -> {
1138      if (err != null) {
1139        future.completeExceptionally(err);
1140        return;
1141      }
1142      ServerName serverName = location.getServerName();
1143      if (serverName == null) {
1144        future
1145          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1146        return;
1147      }
1148      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1149        (ret, err2) -> {
1150          if (err2 != null) {
1151            future.completeExceptionally(err2);
1152          } else {
1153            future.complete(ret);
1154          }
1155        });
1156    });
1157    return future;
1158  }
1159
1160  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1161    byte[] columnFamily, boolean writeFlushWALMarker) {
1162    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1163      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1164        FlushRegionResponse> adminCall(controller, stub,
1165          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1166            writeFlushWALMarker),
1167          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1168      .call();
1169  }
1170
1171  @Override
1172  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1173    CompletableFuture<Void> future = new CompletableFuture<>();
1174    addListener(getRegions(sn), (hRegionInfos, err) -> {
1175      if (err != null) {
1176        future.completeExceptionally(err);
1177        return;
1178      }
1179      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1180      if (hRegionInfos != null) {
1181        hRegionInfos
1182          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1183          })));
1184      }
1185      addListener(CompletableFuture.allOf(
1186        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1187          if (err2 != null) {
1188            future.completeExceptionally(err2);
1189          } else {
1190            future.complete(ret);
1191          }
1192        });
1193    });
1194    return future;
1195  }
1196
1197  @Override
1198  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1199    return compact(tableName, null, false, compactType);
1200  }
1201
1202  @Override
1203  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1204    CompactType compactType) {
1205    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1206      + "If you don't specify a columnFamily, use compact(TableName) instead");
1207    return compact(tableName, columnFamily, false, compactType);
1208  }
1209
1210  @Override
1211  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1212    return compactRegion(regionName, null, false);
1213  }
1214
1215  @Override
1216  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1217    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1218      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1219    return compactRegion(regionName, columnFamily, false);
1220  }
1221
1222  @Override
1223  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1224    return compact(tableName, null, true, compactType);
1225  }
1226
1227  @Override
1228  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1229    CompactType compactType) {
1230    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1231      + "If you don't specify a columnFamily, use compact(TableName) instead");
1232    return compact(tableName, columnFamily, true, compactType);
1233  }
1234
1235  @Override
1236  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1237    return compactRegion(regionName, null, true);
1238  }
1239
1240  @Override
1241  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1242    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1243      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1244    return compactRegion(regionName, columnFamily, true);
1245  }
1246
1247  @Override
1248  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1249    return compactRegionServer(sn, false);
1250  }
1251
1252  @Override
1253  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1254    return compactRegionServer(sn, true);
1255  }
1256
1257  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1258    CompletableFuture<Void> future = new CompletableFuture<>();
1259    addListener(getRegions(sn), (hRegionInfos, err) -> {
1260      if (err != null) {
1261        future.completeExceptionally(err);
1262        return;
1263      }
1264      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1265      if (hRegionInfos != null) {
1266        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1267      }
1268      addListener(CompletableFuture.allOf(
1269        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1270          if (err2 != null) {
1271            future.completeExceptionally(err2);
1272          } else {
1273            future.complete(ret);
1274          }
1275        });
1276    });
1277    return future;
1278  }
1279
1280  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1281    boolean major) {
1282    CompletableFuture<Void> future = new CompletableFuture<>();
1283    addListener(getRegionLocation(regionName), (location, err) -> {
1284      if (err != null) {
1285        future.completeExceptionally(err);
1286        return;
1287      }
1288      ServerName serverName = location.getServerName();
1289      if (serverName == null) {
1290        future
1291          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1292        return;
1293      }
1294      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1295        (ret, err2) -> {
1296          if (err2 != null) {
1297            future.completeExceptionally(err2);
1298          } else {
1299            future.complete(ret);
1300          }
1301        });
1302    });
1303    return future;
1304  }
1305
1306  /**
1307   * List all region locations for the specific table.
1308   */
1309  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1310    if (TableName.META_TABLE_NAME.equals(tableName)) {
1311      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1312      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1313        if (err != null) {
1314          future.completeExceptionally(err);
1315        } else if (
1316          metaRegions == null || metaRegions.isEmpty()
1317            || metaRegions.getDefaultRegionLocation() == null
1318        ) {
1319          future.completeExceptionally(new IOException("meta region does not found"));
1320        } else {
1321          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1322        }
1323      });
1324      return future;
1325    } else {
1326      // For non-meta table, we fetch all locations by scanning hbase:meta table
1327      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1328    }
1329  }
1330
1331  /**
1332   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1333   */
1334  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1335    CompactType compactType) {
1336    CompletableFuture<Void> future = new CompletableFuture<>();
1337
1338    switch (compactType) {
1339      case MOB:
1340        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1341          if (err != null) {
1342            future.completeExceptionally(err);
1343            return;
1344          }
1345          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1346          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1347            if (err2 != null) {
1348              future.completeExceptionally(err2);
1349            } else {
1350              future.complete(ret);
1351            }
1352          });
1353        });
1354        break;
1355      case NORMAL:
1356        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1357          if (err != null) {
1358            future.completeExceptionally(err);
1359            return;
1360          }
1361          if (locations == null || locations.isEmpty()) {
1362            future.completeExceptionally(new TableNotFoundException(tableName));
1363            return;
1364          }
1365          CompletableFuture<?>[] compactFutures =
1366            locations.stream().filter(l -> l.getRegion() != null)
1367              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1368              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1369              .toArray(CompletableFuture<?>[]::new);
1370          // future complete unless all of the compact futures are completed.
1371          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1372            if (err2 != null) {
1373              future.completeExceptionally(err2);
1374            } else {
1375              future.complete(ret);
1376            }
1377          });
1378        });
1379        break;
1380      default:
1381        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1382    }
1383    return future;
1384  }
1385
1386  /**
1387   * Compact the region at specific region server.
1388   */
1389  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1390    final boolean major, byte[] columnFamily) {
1391    return this.<Void> newAdminCaller().serverName(sn)
1392      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1393        Void> adminCall(controller, stub,
1394          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1395          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1396      .call();
1397  }
1398
1399  private byte[] toEncodeRegionName(byte[] regionName) {
1400    return RegionInfo.isEncodedRegionName(regionName)
1401      ? regionName
1402      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1403  }
1404
1405  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1406    CompletableFuture<TableName> result) {
1407    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1408      if (err != null) {
1409        result.completeExceptionally(err);
1410        return;
1411      }
1412      RegionInfo regionInfo = location.getRegion();
1413      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1414        result.completeExceptionally(
1415          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1416        return;
1417      }
1418      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1419        if (!tableName.get().equals(regionInfo.getTable())) {
1420          // tables of this two region should be same.
1421          result.completeExceptionally(
1422            new IllegalArgumentException("Cannot merge regions from two different tables "
1423              + tableName.get() + " and " + regionInfo.getTable()));
1424        } else {
1425          result.complete(tableName.get());
1426        }
1427      }
1428    });
1429  }
1430
1431  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1432    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1433    CompletableFuture<TableName> future = new CompletableFuture<>();
1434    for (byte[] encodedRegionName : encodedRegionNames) {
1435      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1436    }
1437    return future;
1438  }
1439
1440  @Override
1441  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1442    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1443  }
1444
1445  @Override
1446  public CompletableFuture<Boolean> isMergeEnabled() {
1447    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1448  }
1449
1450  @Override
1451  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1452    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1453  }
1454
1455  @Override
1456  public CompletableFuture<Boolean> isSplitEnabled() {
1457    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1458  }
1459
1460  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1461    MasterSwitchType switchType) {
1462    SetSplitOrMergeEnabledRequest request =
1463      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1464    return this.<Boolean> newMasterCaller()
1465      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1466        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1467          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1468          (resp) -> resp.getPrevValueList().get(0)))
1469      .call();
1470  }
1471
1472  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1473    IsSplitOrMergeEnabledRequest request =
1474      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1475    return this.<Boolean> newMasterCaller()
1476      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1477        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1478          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1479      .call();
1480  }
1481
1482  @Override
1483  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1484    if (nameOfRegionsToMerge.size() < 2) {
1485      return failedFuture(new IllegalArgumentException(
1486        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1487    }
1488    CompletableFuture<Void> future = new CompletableFuture<>();
1489    byte[][] encodedNameOfRegionsToMerge =
1490      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1491
1492    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1493      if (err != null) {
1494        future.completeExceptionally(err);
1495        return;
1496      }
1497
1498      final MergeTableRegionsRequest request;
1499      try {
1500        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1501          forcible, ng.getNonceGroup(), ng.newNonce());
1502      } catch (DeserializationException e) {
1503        future.completeExceptionally(e);
1504        return;
1505      }
1506
1507      addListener(
1508        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1509          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1510        (ret, err2) -> {
1511          if (err2 != null) {
1512            future.completeExceptionally(err2);
1513          } else {
1514            future.complete(ret);
1515          }
1516        });
1517    });
1518    return future;
1519  }
1520
1521  @Override
1522  public CompletableFuture<Void> split(TableName tableName) {
1523    CompletableFuture<Void> future = new CompletableFuture<>();
1524    addListener(tableExists(tableName), (exist, error) -> {
1525      if (error != null) {
1526        future.completeExceptionally(error);
1527        return;
1528      }
1529      if (!exist) {
1530        future.completeExceptionally(new TableNotFoundException(tableName));
1531        return;
1532      }
1533      addListener(metaTable
1534        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1535          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1536            ClientMetaTableAccessor.QueryType.REGION))
1537          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1538            ClientMetaTableAccessor.QueryType.REGION))),
1539        (results, err2) -> {
1540          if (err2 != null) {
1541            future.completeExceptionally(err2);
1542            return;
1543          }
1544          if (results != null && !results.isEmpty()) {
1545            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1546            for (Result r : results) {
1547              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1548                continue;
1549              }
1550              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1551              if (rl != null) {
1552                for (HRegionLocation h : rl.getRegionLocations()) {
1553                  if (h != null && h.getServerName() != null) {
1554                    RegionInfo hri = h.getRegion();
1555                    if (
1556                      hri == null || hri.isSplitParent()
1557                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1558                    ) {
1559                      continue;
1560                    }
1561                    splitFutures.add(split(hri, null));
1562                  }
1563                }
1564              }
1565            }
1566            addListener(
1567              CompletableFuture
1568                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1569              (ret, exception) -> {
1570                if (exception != null) {
1571                  future.completeExceptionally(exception);
1572                  return;
1573                }
1574                future.complete(ret);
1575              });
1576          } else {
1577            future.complete(null);
1578          }
1579        });
1580    });
1581    return future;
1582  }
1583
1584  @Override
1585  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1586    CompletableFuture<Void> result = new CompletableFuture<>();
1587    if (splitPoint == null) {
1588      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1589    }
1590    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1591      (loc, err) -> {
1592        if (err != null) {
1593          result.completeExceptionally(err);
1594        } else if (loc == null || loc.getRegion() == null) {
1595          result.completeExceptionally(new IllegalArgumentException(
1596            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1597        } else {
1598          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1599            if (err2 != null) {
1600              result.completeExceptionally(err2);
1601            } else {
1602              result.complete(ret);
1603            }
1604
1605          });
1606        }
1607      });
1608    return result;
1609  }
1610
1611  @Override
1612  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1613    CompletableFuture<Void> future = new CompletableFuture<>();
1614    addListener(getRegionLocation(regionName), (location, err) -> {
1615      if (err != null) {
1616        future.completeExceptionally(err);
1617        return;
1618      }
1619      RegionInfo regionInfo = location.getRegion();
1620      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1621        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1622          + "Replicas are auto-split when their primary is split."));
1623        return;
1624      }
1625      ServerName serverName = location.getServerName();
1626      if (serverName == null) {
1627        future
1628          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1629        return;
1630      }
1631      addListener(split(regionInfo, null), (ret, err2) -> {
1632        if (err2 != null) {
1633          future.completeExceptionally(err2);
1634        } else {
1635          future.complete(ret);
1636        }
1637      });
1638    });
1639    return future;
1640  }
1641
1642  @Override
1643  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1644    Preconditions.checkNotNull(splitPoint,
1645      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1646    CompletableFuture<Void> future = new CompletableFuture<>();
1647    addListener(getRegionLocation(regionName), (location, err) -> {
1648      if (err != null) {
1649        future.completeExceptionally(err);
1650        return;
1651      }
1652      RegionInfo regionInfo = location.getRegion();
1653      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1654        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1655          + "Replicas are auto-split when their primary is split."));
1656        return;
1657      }
1658      ServerName serverName = location.getServerName();
1659      if (serverName == null) {
1660        future
1661          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1662        return;
1663      }
1664      if (
1665        regionInfo.getStartKey() != null
1666          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1667      ) {
1668        future.completeExceptionally(
1669          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1670        return;
1671      }
1672      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1673        if (err2 != null) {
1674          future.completeExceptionally(err2);
1675        } else {
1676          future.complete(ret);
1677        }
1678      });
1679    });
1680    return future;
1681  }
1682
1683  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1684    CompletableFuture<Void> future = new CompletableFuture<>();
1685    TableName tableName = hri.getTable();
1686    final SplitTableRegionRequest request;
1687    try {
1688      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1689        ng.newNonce());
1690    } catch (DeserializationException e) {
1691      future.completeExceptionally(e);
1692      return future;
1693    }
1694
1695    addListener(
1696      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1697        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1698      (ret, err2) -> {
1699        if (err2 != null) {
1700          future.completeExceptionally(err2);
1701        } else {
1702          future.complete(ret);
1703        }
1704      });
1705    return future;
1706  }
1707
1708  @Override
1709  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1710    CompletableFuture<Void> future = new CompletableFuture<>();
1711    addListener(getRegionLocation(regionName), (location, err) -> {
1712      if (err != null) {
1713        future.completeExceptionally(err);
1714        return;
1715      }
1716      RegionInfo regionInfo = location.getRegion();
1717      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1718        future.completeExceptionally(new IllegalArgumentException(
1719          "Can't truncate replicas directly.Replicas are auto-truncated "
1720            + "when their primary is truncated."));
1721        return;
1722      }
1723      ServerName serverName = location.getServerName();
1724      if (serverName == null) {
1725        future
1726          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1727        return;
1728      }
1729      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1730        if (err2 != null) {
1731          future.completeExceptionally(err2);
1732        } else {
1733          future.complete(ret);
1734        }
1735      });
1736    });
1737    return future;
1738  }
1739
1740  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1741    CompletableFuture<Void> future = new CompletableFuture<>();
1742    TableName tableName = hri.getTable();
1743    final MasterProtos.TruncateRegionRequest request;
1744    try {
1745      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1746    } catch (DeserializationException e) {
1747      future.completeExceptionally(e);
1748      return future;
1749    }
1750    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1751      MasterProtos.TruncateRegionResponse::getProcId,
1752      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1753        if (err2 != null) {
1754          future.completeExceptionally(err2);
1755        } else {
1756          future.complete(ret);
1757        }
1758      });
1759    return future;
1760  }
1761
1762  @Override
1763  public CompletableFuture<Void> assign(byte[] regionName) {
1764    CompletableFuture<Void> future = new CompletableFuture<>();
1765    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1766      if (err != null) {
1767        future.completeExceptionally(err);
1768        return;
1769      }
1770      addListener(
1771        this.<Void> newMasterCaller().tableName(regionInfo.getTable())
1772          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1773            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1774            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1775          .call(),
1776        (ret, err2) -> {
1777          if (err2 != null) {
1778            future.completeExceptionally(err2);
1779          } else {
1780            future.complete(ret);
1781          }
1782        });
1783    });
1784    return future;
1785  }
1786
1787  @Override
1788  public CompletableFuture<Void> unassign(byte[] regionName) {
1789    CompletableFuture<Void> future = new CompletableFuture<>();
1790    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1791      if (err != null) {
1792        future.completeExceptionally(err);
1793        return;
1794      }
1795      addListener(
1796        this.<Void> newMasterCaller().tableName(regionInfo.getTable())
1797          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1798            Void> call(controller, stub,
1799              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1800              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1801          .call(),
1802        (ret, err2) -> {
1803          if (err2 != null) {
1804            future.completeExceptionally(err2);
1805          } else {
1806            future.complete(ret);
1807          }
1808        });
1809    });
1810    return future;
1811  }
1812
1813  @Override
1814  public CompletableFuture<Void> offline(byte[] regionName) {
1815    CompletableFuture<Void> future = new CompletableFuture<>();
1816    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1817      if (err != null) {
1818        future.completeExceptionally(err);
1819        return;
1820      }
1821      addListener(this.<Void> newMasterCaller().tableName(regionInfo.getTable())
1822        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1823          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1824          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1825        .call(), (ret, err2) -> {
1826          if (err2 != null) {
1827            future.completeExceptionally(err2);
1828          } else {
1829            future.complete(ret);
1830          }
1831        });
1832    });
1833    return future;
1834  }
1835
1836  @Override
1837  public CompletableFuture<Void> move(byte[] regionName) {
1838    CompletableFuture<Void> future = new CompletableFuture<>();
1839    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1840      if (err != null) {
1841        future.completeExceptionally(err);
1842        return;
1843      }
1844      addListener(
1845        moveRegion(regionInfo,
1846          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1847        (ret, err2) -> {
1848          if (err2 != null) {
1849            future.completeExceptionally(err2);
1850          } else {
1851            future.complete(ret);
1852          }
1853        });
1854    });
1855    return future;
1856  }
1857
1858  @Override
1859  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1860    Preconditions.checkNotNull(destServerName,
1861      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1862    CompletableFuture<Void> future = new CompletableFuture<>();
1863    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1864      if (err != null) {
1865        future.completeExceptionally(err);
1866        return;
1867      }
1868      addListener(
1869        moveRegion(regionInfo, RequestConverter
1870          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1871        (ret, err2) -> {
1872          if (err2 != null) {
1873            future.completeExceptionally(err2);
1874          } else {
1875            future.complete(ret);
1876          }
1877        });
1878    });
1879    return future;
1880  }
1881
1882  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1883    return this.<Void> newMasterCaller().tableName(regionInfo.getTable())
1884      .action(
1885        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1886          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1887      .call();
1888  }
1889
1890  @Override
1891  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1892    return this.<Void> newMasterCaller()
1893      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1894        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1895        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1896      .call();
1897  }
1898
1899  @Override
1900  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1901    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1902    Scan scan = QuotaTableUtil.makeScan(filter);
1903    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1904      new AdvancedScanResultConsumer() {
1905        List<QuotaSettings> settings = new ArrayList<>();
1906
1907        @Override
1908        public void onNext(Result[] results, ScanController controller) {
1909          for (Result result : results) {
1910            try {
1911              QuotaTableUtil.parseResultToCollection(result, settings);
1912            } catch (IOException e) {
1913              controller.terminate();
1914              future.completeExceptionally(e);
1915            }
1916          }
1917        }
1918
1919        @Override
1920        public void onError(Throwable error) {
1921          future.completeExceptionally(error);
1922        }
1923
1924        @Override
1925        public void onComplete() {
1926          future.complete(settings);
1927        }
1928      });
1929    return future;
1930  }
1931
1932  @Override
1933  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1934    boolean enabled) {
1935    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1936      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1937      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1938      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1939  }
1940
1941  @Override
1942  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1943    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1944      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1945      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1946      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1947  }
1948
1949  @Override
1950  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1951    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1952      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1953      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1954      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1955  }
1956
1957  @Override
1958  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1959    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1960      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1961      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1962      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1963  }
1964
1965  @Override
1966  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1967    return this.<ReplicationPeerConfig> newMasterCaller()
1968      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1969        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1970          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1971          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1972          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1973      .call();
1974  }
1975
1976  @Override
1977  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1978    ReplicationPeerConfig peerConfig) {
1979    return this.<UpdateReplicationPeerConfigRequest,
1980      UpdateReplicationPeerConfigResponse> procedureCall(
1981        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1982        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1983        (resp) -> resp.getProcId(),
1984        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1985  }
1986
1987  @Override
1988  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1989    SyncReplicationState clusterState) {
1990    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1991      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1992        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1993          clusterState),
1994        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1995        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1996          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1997  }
1998
1999  @Override
2000  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
2001    Map<TableName, List<String>> tableCfs) {
2002    if (tableCfs == null) {
2003      return failedFuture(new ReplicationException("tableCfs is null"));
2004    }
2005
2006    CompletableFuture<Void> future = new CompletableFuture<>();
2007    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
2008      if (!completeExceptionally(future, error)) {
2009        ReplicationPeerConfig newPeerConfig =
2010          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
2011        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
2012          if (!completeExceptionally(future, error)) {
2013            future.complete(result);
2014          }
2015        });
2016      }
2017    });
2018    return future;
2019  }
2020
2021  @Override
2022  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
2023    Map<TableName, List<String>> tableCfs) {
2024    if (tableCfs == null) {
2025      return failedFuture(new ReplicationException("tableCfs is null"));
2026    }
2027
2028    CompletableFuture<Void> future = new CompletableFuture<>();
2029    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
2030      if (!completeExceptionally(future, error)) {
2031        ReplicationPeerConfig newPeerConfig = null;
2032        try {
2033          newPeerConfig = ReplicationPeerConfigUtil
2034            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
2035        } catch (ReplicationException e) {
2036          future.completeExceptionally(e);
2037          return;
2038        }
2039        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
2040          if (!completeExceptionally(future, error)) {
2041            future.complete(result);
2042          }
2043        });
2044      }
2045    });
2046    return future;
2047  }
2048
2049  @Override
2050  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
2051    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
2052  }
2053
2054  @Override
2055  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
2056    Preconditions.checkNotNull(pattern,
2057      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
2058    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
2059  }
2060
2061  private CompletableFuture<List<ReplicationPeerDescription>>
2062    listReplicationPeers(ListReplicationPeersRequest request) {
2063    return this.<List<ReplicationPeerDescription>> newMasterCaller()
2064      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
2065        List<ReplicationPeerDescription>> call(controller, stub, request,
2066          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
2067          (resp) -> resp.getPeerDescList().stream()
2068            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
2069            .collect(Collectors.toList())))
2070      .call();
2071  }
2072
2073  @Override
2074  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2075    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2076    addListener(listTableDescriptors(), (tables, error) -> {
2077      if (!completeExceptionally(future, error)) {
2078        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2079        tables.forEach(table -> {
2080          Map<String, Integer> cfs = new HashMap<>();
2081          Stream.of(table.getColumnFamilies())
2082            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2083            .forEach(column -> {
2084              cfs.put(column.getNameAsString(), column.getScope());
2085            });
2086          if (!cfs.isEmpty()) {
2087            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2088          }
2089        });
2090        future.complete(replicatedTableCFs);
2091      }
2092    });
2093    return future;
2094  }
2095
2096  @Override
2097  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2098    SnapshotProtos.SnapshotDescription snapshot =
2099      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2100    try {
2101      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2102    } catch (IllegalArgumentException e) {
2103      return failedFuture(e);
2104    }
2105    CompletableFuture<Void> future = new CompletableFuture<>();
2106    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2107      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2108    addListener(this.<SnapshotResponse> newMasterCaller()
2109      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2110        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2111      .call(), (resp, err) -> {
2112        if (err != null) {
2113          future.completeExceptionally(err);
2114          return;
2115        }
2116        waitSnapshotFinish(snapshotDesc, future, resp);
2117      });
2118    return future;
2119  }
2120
2121  // This is for keeping compatibility with old implementation.
2122  // If there is a procId field in the response, then the snapshot will be operated with a
2123  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2124  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2125    SnapshotResponse resp) {
2126    if (resp.hasProcId()) {
2127      getProcedureResult(resp.getProcId(), src -> null, future, 0);
2128      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2129    } else {
2130      long expectedTimeout = resp.getExpectedTimeout();
2131      TimerTask pollingTask = new TimerTask() {
2132        int tries = 0;
2133        long startTime = EnvironmentEdgeManager.currentTime();
2134        long endTime = startTime + expectedTimeout;
2135        long maxPauseTime = expectedTimeout / maxAttempts;
2136
2137        @Override
2138        public void run(Timeout timeout) throws Exception {
2139          if (EnvironmentEdgeManager.currentTime() < endTime) {
2140            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2141              if (err2 != null) {
2142                future.completeExceptionally(err2);
2143              } else if (done) {
2144                future.complete(null);
2145              } else {
2146                // retry again after pauseTime.
2147                long pauseTime =
2148                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2149                pauseTime = Math.min(pauseTime, maxPauseTime);
2150                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2151              }
2152            });
2153          } else {
2154            future
2155              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2156                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2157          }
2158        }
2159      };
2160      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2161    }
2162  }
2163
2164  @Override
2165  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2166    return this.<Boolean> newMasterCaller()
2167      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2168        Boolean> call(controller, stub,
2169          IsSnapshotDoneRequest.newBuilder()
2170            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2171          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2172      .call();
2173  }
2174
2175  @Override
2176  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2177    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2178      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2179      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2180    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2181  }
2182
2183  @Override
2184  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2185    boolean restoreAcl) {
2186    CompletableFuture<Void> future = new CompletableFuture<>();
2187    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2188      if (err != null) {
2189        future.completeExceptionally(err);
2190        return;
2191      }
2192      TableName tableName = null;
2193      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2194        for (SnapshotDescription snap : snapshotDescriptions) {
2195          if (snap.getName().equals(snapshotName)) {
2196            tableName = snap.getTableName();
2197            break;
2198          }
2199        }
2200      }
2201      if (tableName == null) {
2202        future.completeExceptionally(
2203          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2204        return;
2205      }
2206      final TableName finalTableName = tableName;
2207      addListener(tableExists(finalTableName), (exists, err2) -> {
2208        if (err2 != null) {
2209          future.completeExceptionally(err2);
2210        } else if (!exists) {
2211          // if table does not exist, then just clone snapshot into new table.
2212          completeConditionalOnFuture(future,
2213            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2214        } else {
2215          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2216            if (err4 != null) {
2217              future.completeExceptionally(err4);
2218            } else if (!disabled) {
2219              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2220            } else {
2221              completeConditionalOnFuture(future,
2222                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2223            }
2224          });
2225        }
2226      });
2227    });
2228    return future;
2229  }
2230
2231  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2232    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2233    if (takeFailSafeSnapshot) {
2234      CompletableFuture<Void> future = new CompletableFuture<>();
2235      // Step.1 Take a snapshot of the current state
2236      String failSafeSnapshotSnapshotNameFormat =
2237        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2238          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2239      final String failSafeSnapshotSnapshotName =
2240        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2241          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2242          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2243      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2244      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2245        if (err != null) {
2246          future.completeExceptionally(err);
2247        } else {
2248          // Step.2 Restore snapshot
2249          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2250            (void2, err2) -> {
2251              if (err2 != null) {
2252                // Step.3.a Something went wrong during the restore and try to rollback.
2253                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2254                  restoreAcl, null), (void3, err3) -> {
2255                    if (err3 != null) {
2256                      future.completeExceptionally(err3);
2257                    } else {
2258                      // If fail to restore snapshot but rollback successfully, delete the
2259                      // restore-failsafe snapshot.
2260                      LOG.info(
2261                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2262                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2263                        if (err4 != null) {
2264                          LOG.error("Unable to remove the failsafe snapshot: {}",
2265                            failSafeSnapshotSnapshotName, err4);
2266                        }
2267                        String msg =
2268                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2269                            + failSafeSnapshotSnapshotName + " succeeded.";
2270                        LOG.error(msg);
2271                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2272                      });
2273                    }
2274                  });
2275              } else {
2276                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2277                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2278                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2279                  if (err3 != null) {
2280                    LOG.error(
2281                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2282                      err3);
2283                  }
2284                  future.complete(ret3);
2285                });
2286              }
2287            });
2288        }
2289      });
2290      return future;
2291    } else {
2292      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2293    }
2294  }
2295
2296  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2297    CompletableFuture<T> parentFuture) {
2298    addListener(parentFuture, (res, err) -> {
2299      if (err != null) {
2300        dependentFuture.completeExceptionally(err);
2301      } else {
2302        dependentFuture.complete(res);
2303      }
2304    });
2305  }
2306
2307  @Override
2308  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2309    boolean restoreAcl, String customSFT) {
2310    CompletableFuture<Void> future = new CompletableFuture<>();
2311    addListener(tableExists(tableName), (exists, err) -> {
2312      if (err != null) {
2313        future.completeExceptionally(err);
2314      } else if (exists) {
2315        future.completeExceptionally(new TableExistsException(tableName));
2316      } else {
2317        completeConditionalOnFuture(future,
2318          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2319      }
2320    });
2321    return future;
2322  }
2323
2324  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2325    boolean restoreAcl, String customSFT) {
2326    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2327      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2328    try {
2329      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2330    } catch (IllegalArgumentException e) {
2331      return failedFuture(e);
2332    }
2333    RestoreSnapshotRequest.Builder builder =
2334      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2335        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2336    if (customSFT != null) {
2337      builder.setCustomSFT(customSFT);
2338    }
2339    return waitProcedureResult(this.<Long> newMasterCaller()
2340      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2341        Long> call(controller, stub, builder.build(),
2342          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2343      .call(), result -> null);
2344  }
2345
2346  @Override
2347  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2348    return getCompletedSnapshots(null);
2349  }
2350
2351  @Override
2352  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2353    Preconditions.checkNotNull(pattern,
2354      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2355    return getCompletedSnapshots(pattern);
2356  }
2357
2358  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2359    return this.<List<SnapshotDescription>> newMasterCaller()
2360      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2361        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2362          GetCompletedSnapshotsRequest.newBuilder().build(),
2363          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2364          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2365      .call();
2366  }
2367
2368  @Override
2369  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2370    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2371      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2372    return getCompletedSnapshots(tableNamePattern, null);
2373  }
2374
2375  @Override
2376  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2377    Pattern snapshotNamePattern) {
2378    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2379      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2380    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2381      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2382    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2383  }
2384
2385  private CompletableFuture<List<SnapshotDescription>>
2386    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2387    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2388    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2389      if (err != null) {
2390        future.completeExceptionally(err);
2391        return;
2392      }
2393      if (tableNames == null || tableNames.size() <= 0) {
2394        future.complete(Collections.emptyList());
2395        return;
2396      }
2397      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2398        if (err2 != null) {
2399          future.completeExceptionally(err2);
2400          return;
2401        }
2402        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2403          future.complete(Collections.emptyList());
2404          return;
2405        }
2406        future.complete(snapshotDescList.stream()
2407          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2408          .collect(Collectors.toList()));
2409      });
2410    });
2411    return future;
2412  }
2413
2414  @Override
2415  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2416    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2417  }
2418
2419  @Override
2420  public CompletableFuture<Void> deleteSnapshots() {
2421    return internalDeleteSnapshots(null, null);
2422  }
2423
2424  @Override
2425  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2426    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2427      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2428    return internalDeleteSnapshots(null, snapshotNamePattern);
2429  }
2430
2431  @Override
2432  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2433    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2434      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2435    return internalDeleteSnapshots(tableNamePattern, null);
2436  }
2437
2438  @Override
2439  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2440    Pattern snapshotNamePattern) {
2441    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2442      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2443    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2444      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2445    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2446  }
2447
2448  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2449    Pattern snapshotNamePattern) {
2450    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2451    if (tableNamePattern == null) {
2452      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2453    } else {
2454      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2455    }
2456    CompletableFuture<Void> future = new CompletableFuture<>();
2457    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2458      if (err != null) {
2459        future.completeExceptionally(err);
2460        return;
2461      }
2462      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2463        future.complete(null);
2464        return;
2465      }
2466      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2467        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2468          if (e != null) {
2469            future.completeExceptionally(e);
2470          } else {
2471            future.complete(v);
2472          }
2473        });
2474    });
2475    return future;
2476  }
2477
2478  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2479    return this.<Void> newMasterCaller()
2480      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2481        controller, stub,
2482        DeleteSnapshotRequest.newBuilder()
2483          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2484        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2485      .call();
2486  }
2487
2488  @Override
2489  public CompletableFuture<Void> execProcedure(String signature, String instance,
2490    Map<String, String> props) {
2491    CompletableFuture<Void> future = new CompletableFuture<>();
2492    ProcedureDescription procDesc =
2493      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2494    addListener(this.<Long> newMasterCaller()
2495      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2496        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2497        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2498      .call(), (expectedTimeout, err) -> {
2499        if (err != null) {
2500          future.completeExceptionally(err);
2501          return;
2502        }
2503        TimerTask pollingTask = new TimerTask() {
2504          int tries = 0;
2505          long startTime = EnvironmentEdgeManager.currentTime();
2506          long endTime = startTime + expectedTimeout;
2507          long maxPauseTime = expectedTimeout / maxAttempts;
2508
2509          @Override
2510          public void run(Timeout timeout) throws Exception {
2511            if (EnvironmentEdgeManager.currentTime() < endTime) {
2512              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2513                if (err2 != null) {
2514                  future.completeExceptionally(err2);
2515                  return;
2516                }
2517                if (done) {
2518                  future.complete(null);
2519                } else {
2520                  // retry again after pauseTime.
2521                  long pauseTime =
2522                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2523                  pauseTime = Math.min(pauseTime, maxPauseTime);
2524                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2525                    TimeUnit.MICROSECONDS);
2526                }
2527              });
2528            } else {
2529              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2530                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2531            }
2532          }
2533        };
2534        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2535        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2536      });
2537    return future;
2538  }
2539
2540  @Override
2541  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2542    Map<String, String> props) {
2543    ProcedureDescription proDesc =
2544      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2545    return this.<byte[]> newMasterCaller()
2546      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2547        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2548        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2549        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2550      .call();
2551  }
2552
2553  @Override
2554  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2555    Map<String, String> props) {
2556    ProcedureDescription proDesc =
2557      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2558    return this.<Boolean> newMasterCaller()
2559      .action(
2560        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2561          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2562          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2563      .call();
2564  }
2565
2566  @Override
2567  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2568    return this.<Boolean> newMasterCaller().action(
2569      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2570        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2571        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2572      .call();
2573  }
2574
2575  @Override
2576  public CompletableFuture<String> getProcedures() {
2577    return this.<String> newMasterCaller()
2578      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2579        controller, stub, GetProceduresRequest.newBuilder().build(),
2580        (s, c, req, done) -> s.getProcedures(c, req, done),
2581        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2582      .call();
2583  }
2584
2585  @Override
2586  public CompletableFuture<String> getLocks() {
2587    return this.<String> newMasterCaller()
2588      .action(
2589        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2590          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2591          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2592      .call();
2593  }
2594
2595  @Override
2596  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2597    boolean offload) {
2598    return this.<Void> newMasterCaller()
2599      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2600        DecommissionRegionServersResponse, Void> call(controller, stub,
2601          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2602          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2603      .call();
2604  }
2605
2606  @Override
2607  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2608    return this.<List<ServerName>> newMasterCaller()
2609      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2610        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2611          ListDecommissionedRegionServersRequest.newBuilder().build(),
2612          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2613          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2614            .collect(Collectors.toList())))
2615      .call();
2616  }
2617
2618  @Override
2619  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2620    List<byte[]> encodedRegionNames) {
2621    return this.<Void> newMasterCaller()
2622      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2623        RecommissionRegionServerResponse, Void> call(controller, stub,
2624          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2625          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2626      .call();
2627  }
2628
2629  /**
2630   * Get the region location for the passed region name. The region name may be a full region name
2631   * or encoded region name. If the region does not found, then it'll throw an
2632   * UnknownRegionException wrapped by a {@link CompletableFuture}
2633   * @param regionNameOrEncodedRegionName region name or encoded region name
2634   * @return region location, wrapped by a {@link CompletableFuture}
2635   */
2636  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2637    if (regionNameOrEncodedRegionName == null) {
2638      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2639    }
2640
2641    CompletableFuture<Optional<HRegionLocation>> future;
2642    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2643      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2644      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2645        // old format encodedName, should be meta region
2646        future = connection.registry.getMetaRegionLocations()
2647          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2648            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2649      } else {
2650        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2651          regionNameOrEncodedRegionName);
2652      }
2653    } else {
2654      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2655      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2656      RegionInfo regionInfo;
2657      try {
2658        regionInfo =
2659          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2660      } catch (IOException ioe) {
2661        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2662      }
2663
2664      if (regionInfo.isMetaRegion()) {
2665        future = connection.registry.getMetaRegionLocations()
2666          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2667            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2668            .findFirst());
2669      } else {
2670        future =
2671          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2672      }
2673    }
2674
2675    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2676    addListener(future, (location, err) -> {
2677      if (err != null) {
2678        returnedFuture.completeExceptionally(err);
2679        return;
2680      }
2681      if (!location.isPresent() || location.get().getRegion() == null) {
2682        returnedFuture.completeExceptionally(
2683          new UnknownRegionException("Invalid region name or encoded region name: "
2684            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2685      } else {
2686        returnedFuture.complete(location.get());
2687      }
2688    });
2689    return returnedFuture;
2690  }
2691
2692  /**
2693   * Get the region info for the passed region name. The region name may be a full region name or
2694   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2695   * wrapped by a {@link CompletableFuture}
2696   * @return region info, wrapped by a {@link CompletableFuture}
2697   */
2698  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2699    if (regionNameOrEncodedRegionName == null) {
2700      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2701    }
2702
2703    if (
2704      Bytes.equals(regionNameOrEncodedRegionName,
2705        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2706        || Bytes.equals(regionNameOrEncodedRegionName,
2707          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2708    ) {
2709      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2710    }
2711
2712    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2713    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2714      if (err != null) {
2715        future.completeExceptionally(err);
2716      } else {
2717        future.complete(location.getRegion());
2718      }
2719    });
2720    return future;
2721  }
2722
2723  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2724    if (numRegions < 3) {
2725      throw new IllegalArgumentException("Must create at least three regions");
2726    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2727      throw new IllegalArgumentException("Start key must be smaller than end key");
2728    }
2729    if (numRegions == 3) {
2730      return new byte[][] { startKey, endKey };
2731    }
2732    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2733    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2734      throw new IllegalArgumentException("Unable to split key range into enough regions");
2735    }
2736    return splitKeys;
2737  }
2738
2739  private void verifySplitKeys(byte[][] splitKeys) {
2740    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2741    // Verify there are no duplicate split keys
2742    byte[] lastKey = null;
2743    for (byte[] splitKey : splitKeys) {
2744      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2745        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2746      }
2747      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2748        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2749          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2750      }
2751      lastKey = splitKey;
2752    }
2753  }
2754
2755  private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> {
2756
2757    abstract void onFinished();
2758
2759    abstract void onError(Throwable error);
2760
2761    @Override
2762    public void accept(T value, Throwable error) {
2763      if (error != null) {
2764        onError(error);
2765        return;
2766      }
2767      onFinished();
2768    }
2769  }
2770
2771  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2772    protected final TableName tableName;
2773
2774    TableProcedureBiConsumer(TableName tableName) {
2775      this.tableName = tableName;
2776    }
2777
2778    abstract String getOperationType();
2779
2780    String getDescription() {
2781      return "Operation: " + getOperationType() + ", " + "Table Name: "
2782        + tableName.getNameWithNamespaceInclAsString();
2783    }
2784
2785    @Override
2786    void onFinished() {
2787      LOG.info(getDescription() + " completed");
2788    }
2789
2790    @Override
2791    void onError(Throwable error) {
2792      LOG.info(getDescription() + " failed with " + error.getMessage());
2793    }
2794  }
2795
2796  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2797    protected final String namespaceName;
2798
2799    NamespaceProcedureBiConsumer(String namespaceName) {
2800      this.namespaceName = namespaceName;
2801    }
2802
2803    abstract String getOperationType();
2804
2805    String getDescription() {
2806      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2807    }
2808
2809    @Override
2810    void onFinished() {
2811      LOG.info("{} completed", getDescription());
2812    }
2813
2814    @Override
2815    void onError(Throwable error) {
2816      LOG.info("{} failed with {}", getDescription(), error.getMessage());
2817    }
2818  }
2819
2820  private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer {
2821
2822    @Override
2823    void onFinished() {
2824      LOG.info("RestoreBackupSystemTableProcedure completed");
2825    }
2826
2827    @Override
2828    void onError(Throwable error) {
2829      LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage());
2830    }
2831  }
2832
2833  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2834
2835    CreateTableProcedureBiConsumer(TableName tableName) {
2836      super(tableName);
2837    }
2838
2839    @Override
2840    String getOperationType() {
2841      return "CREATE";
2842    }
2843  }
2844
2845  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2846
2847    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2848      super(tableName);
2849    }
2850
2851    @Override
2852    String getOperationType() {
2853      return "MODIFY";
2854    }
2855  }
2856
2857  private static class ReopenTableRegionsProcedureBiConsumer extends TableProcedureBiConsumer {
2858
2859    ReopenTableRegionsProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2860      super(tableName);
2861    }
2862
2863    @Override
2864    String getOperationType() {
2865      return "REOPEN_TABLE_REGIONS";
2866    }
2867  }
2868
2869  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2870    extends TableProcedureBiConsumer {
2871
2872    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2873      super(tableName);
2874    }
2875
2876    @Override
2877    String getOperationType() {
2878      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2879    }
2880  }
2881
2882  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2883
2884    DeleteTableProcedureBiConsumer(TableName tableName) {
2885      super(tableName);
2886    }
2887
2888    @Override
2889    String getOperationType() {
2890      return "DELETE";
2891    }
2892
2893    @Override
2894    void onFinished() {
2895      connection.getLocator().clearCache(this.tableName);
2896      super.onFinished();
2897    }
2898  }
2899
2900  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2901
2902    TruncateTableProcedureBiConsumer(TableName tableName) {
2903      super(tableName);
2904    }
2905
2906    @Override
2907    String getOperationType() {
2908      return "TRUNCATE";
2909    }
2910  }
2911
2912  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2913
2914    EnableTableProcedureBiConsumer(TableName tableName) {
2915      super(tableName);
2916    }
2917
2918    @Override
2919    String getOperationType() {
2920      return "ENABLE";
2921    }
2922  }
2923
2924  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2925
2926    DisableTableProcedureBiConsumer(TableName tableName) {
2927      super(tableName);
2928    }
2929
2930    @Override
2931    String getOperationType() {
2932      return "DISABLE";
2933    }
2934  }
2935
2936  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2937
2938    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2939      super(tableName);
2940    }
2941
2942    @Override
2943    String getOperationType() {
2944      return "ADD_COLUMN_FAMILY";
2945    }
2946  }
2947
2948  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2949
2950    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2951      super(tableName);
2952    }
2953
2954    @Override
2955    String getOperationType() {
2956      return "DELETE_COLUMN_FAMILY";
2957    }
2958  }
2959
2960  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2961
2962    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2963      super(tableName);
2964    }
2965
2966    @Override
2967    String getOperationType() {
2968      return "MODIFY_COLUMN_FAMILY";
2969    }
2970  }
2971
2972  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2973    extends TableProcedureBiConsumer {
2974
2975    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2976      super(tableName);
2977    }
2978
2979    @Override
2980    String getOperationType() {
2981      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2982    }
2983  }
2984
2985  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2986
2987    FlushTableProcedureBiConsumer(TableName tableName) {
2988      super(tableName);
2989    }
2990
2991    @Override
2992    String getOperationType() {
2993      return "FLUSH";
2994    }
2995  }
2996
2997  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2998
2999    CreateNamespaceProcedureBiConsumer(String namespaceName) {
3000      super(namespaceName);
3001    }
3002
3003    @Override
3004    String getOperationType() {
3005      return "CREATE_NAMESPACE";
3006    }
3007  }
3008
3009  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
3010
3011    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
3012      super(namespaceName);
3013    }
3014
3015    @Override
3016    String getOperationType() {
3017      return "DELETE_NAMESPACE";
3018    }
3019  }
3020
3021  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
3022
3023    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
3024      super(namespaceName);
3025    }
3026
3027    @Override
3028    String getOperationType() {
3029      return "MODIFY_NAMESPACE";
3030    }
3031  }
3032
3033  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3034
3035    MergeTableRegionProcedureBiConsumer(TableName tableName) {
3036      super(tableName);
3037    }
3038
3039    @Override
3040    String getOperationType() {
3041      return "MERGE_REGIONS";
3042    }
3043  }
3044
3045  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3046
3047    SplitTableRegionProcedureBiConsumer(TableName tableName) {
3048      super(tableName);
3049    }
3050
3051    @Override
3052    String getOperationType() {
3053      return "SPLIT_REGION";
3054    }
3055  }
3056
3057  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3058
3059    TruncateRegionProcedureBiConsumer(TableName tableName) {
3060      super(tableName);
3061    }
3062
3063    @Override
3064    String getOperationType() {
3065      return "TRUNCATE_REGION";
3066    }
3067  }
3068
3069  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
3070    SnapshotProcedureBiConsumer(TableName tableName) {
3071      super(tableName);
3072    }
3073
3074    @Override
3075    String getOperationType() {
3076      return "SNAPSHOT";
3077    }
3078  }
3079
3080  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> {
3081    private final String peerId;
3082    private final Supplier<String> getOperation;
3083
3084    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
3085      this.peerId = peerId;
3086      this.getOperation = getOperation;
3087    }
3088
3089    String getDescription() {
3090      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3091    }
3092
3093    @Override
3094    void onFinished() {
3095      LOG.info("{} completed", getDescription());
3096    }
3097
3098    @Override
3099    void onError(Throwable error) {
3100      LOG.info("{} failed with {}", getDescription(), error.getMessage());
3101    }
3102  }
3103
3104  private static final class RollAllWALWritersBiConsumer
3105    extends ProcedureBiConsumer<Map<ServerName, Long>> {
3106
3107    @Override
3108    void onFinished() {
3109      LOG.info("Rolling all WAL writers completed");
3110    }
3111
3112    @Override
3113    void onError(Throwable error) {
3114      LOG.warn("Rolling all WAL writers failed with {}", error.getMessage());
3115    }
3116  }
3117
3118  private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture,
3119    Converter<T, ByteString> converter) {
3120    CompletableFuture<T> future = new CompletableFuture<>();
3121    addListener(procFuture, (procId, error) -> {
3122      if (error != null) {
3123        future.completeExceptionally(error);
3124        return;
3125      }
3126      getProcedureResult(procId, converter, future, 0);
3127    });
3128    return future;
3129  }
3130
3131  private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter,
3132    CompletableFuture<T> future, int retries) {
3133    addListener(
3134      this.<GetProcedureResultResponse> newMasterCaller()
3135        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3136          GetProcedureResultResponse> call(controller, stub,
3137            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3138            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3139        .call(),
3140      (response, error) -> {
3141        if (error != null) {
3142          Throwable exc = ConnectionUtils.translateException(error);
3143          if (exc instanceof DoNotRetryIOException) {
3144            // stop retrying on DNRIOE
3145            future.completeExceptionally(exc);
3146          } else {
3147            LOG.warn("failed to get the procedure result procId={}", procId, exc);
3148            retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3149              ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3150          }
3151          return;
3152        }
3153        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3154          retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3155            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3156          return;
3157        }
3158        if (response.hasException()) {
3159          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3160          future.completeExceptionally(ioe);
3161        } else {
3162          try {
3163            future.complete(converter.convert(response.getResult()));
3164          } catch (IOException e) {
3165            future.completeExceptionally(e);
3166          }
3167        }
3168      });
3169  }
3170
3171  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3172    CompletableFuture<T> future = new CompletableFuture<>();
3173    future.completeExceptionally(error);
3174    return future;
3175  }
3176
3177  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3178    if (error != null) {
3179      future.completeExceptionally(error);
3180      return true;
3181    }
3182    return false;
3183  }
3184
3185  @Override
3186  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3187    return getClusterMetrics(EnumSet.allOf(Option.class));
3188  }
3189
3190  @Override
3191  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3192    return this.<ClusterMetrics> newMasterCaller()
3193      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3194        ClusterMetrics> call(controller, stub,
3195          RequestConverter.buildGetClusterStatusRequest(options),
3196          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3197          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3198      .call();
3199  }
3200
3201  @Override
3202  public CompletableFuture<Void> shutdown() {
3203    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3204      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3205        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3206        resp -> null))
3207      .call();
3208  }
3209
3210  @Override
3211  public CompletableFuture<Void> stopMaster() {
3212    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3213      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3214        controller, stub, StopMasterRequest.newBuilder().build(),
3215        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3216      .call();
3217  }
3218
3219  @Override
3220  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3221    StopServerRequest request = RequestConverter
3222      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3223    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3224      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3225        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3226        resp -> null))
3227      .serverName(serverName).call();
3228  }
3229
3230  @Override
3231  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3232    return this.<Void> newAdminCaller()
3233      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3234        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3235          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3236      .serverName(serverName).call();
3237  }
3238
3239  @Override
3240  public CompletableFuture<Void> updateConfiguration() {
3241    CompletableFuture<Void> future = new CompletableFuture<Void>();
3242    addListener(
3243      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3244      (status, err) -> {
3245        if (err != null) {
3246          future.completeExceptionally(err);
3247        } else {
3248          List<CompletableFuture<Void>> futures = new ArrayList<>();
3249          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3250          futures.add(updateConfiguration(status.getMasterName()));
3251          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3252          addListener(
3253            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3254            (result, err2) -> {
3255              if (err2 != null) {
3256                future.completeExceptionally(err2);
3257              } else {
3258                future.complete(result);
3259              }
3260            });
3261        }
3262      });
3263    return future;
3264  }
3265
3266  @Override
3267  public CompletableFuture<Void> updateConfiguration(String groupName) {
3268    CompletableFuture<Void> future = new CompletableFuture<Void>();
3269    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3270      if (err != null) {
3271        future.completeExceptionally(err);
3272      } else if (rsGroupInfo == null) {
3273        future.completeExceptionally(
3274          new IllegalArgumentException("Group does not exist: " + groupName));
3275      } else {
3276        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3277          if (err2 != null) {
3278            future.completeExceptionally(err2);
3279          } else {
3280            List<CompletableFuture<Void>> futures = new ArrayList<>();
3281            List<ServerName> groupServers = status.getServersName().stream()
3282              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3283            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3284            addListener(
3285              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3286              (result, err3) -> {
3287                if (err3 != null) {
3288                  future.completeExceptionally(err3);
3289                } else {
3290                  future.complete(result);
3291                }
3292              });
3293          }
3294        });
3295      }
3296    });
3297    return future;
3298  }
3299
3300  @Override
3301  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3302    return this.<Void> newAdminCaller()
3303      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3304        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3305          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3306      .serverName(serverName).call();
3307  }
3308
3309  @Override
3310  public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
3311    return this
3312      .<RollAllWALWritersRequest, RollAllWALWritersResponse,
3313        Map<ServerName,
3314          Long>> procedureCall(
3315            RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()),
3316            (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(),
3317            result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap()
3318              .entrySet().stream().collect(Collectors
3319                .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)),
3320            new RollAllWALWritersBiConsumer());
3321  }
3322
3323  @Override
3324  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3325    return this.<Void> newAdminCaller()
3326      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3327        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3328          RequestConverter.buildClearCompactionQueuesRequest(queues),
3329          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3330      .serverName(serverName).call();
3331  }
3332
3333  @Override
3334  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3335    return this.<List<SecurityCapability>> newMasterCaller()
3336      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3337        List<SecurityCapability>> call(controller, stub,
3338          SecurityCapabilitiesRequest.newBuilder().build(),
3339          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3340          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3341      .call();
3342  }
3343
3344  @Override
3345  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3346    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3347  }
3348
3349  @Override
3350  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3351    TableName tableName) {
3352    Preconditions.checkNotNull(tableName,
3353      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3354    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3355  }
3356
3357  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3358    ServerName serverName) {
3359    return this.<List<RegionMetrics>> newAdminCaller()
3360      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3361        List<RegionMetrics>> adminCall(controller, stub, request,
3362          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3363          RegionMetricsBuilder::toRegionMetrics))
3364      .serverName(serverName).call();
3365  }
3366
3367  @Override
3368  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3369    return this.<Boolean> newMasterCaller()
3370      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3371        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3372          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3373          resp -> resp.getInMaintenanceMode()))
3374      .call();
3375  }
3376
3377  @Override
3378  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3379    CompactType compactType) {
3380    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3381
3382    switch (compactType) {
3383      case MOB:
3384        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3385          if (err != null) {
3386            future.completeExceptionally(err);
3387            return;
3388          }
3389          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3390
3391          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3392            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3393              GetRegionInfoResponse> adminCall(controller, stub,
3394                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3395                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3396            .call(), (resp2, err2) -> {
3397              if (err2 != null) {
3398                future.completeExceptionally(err2);
3399              } else {
3400                if (resp2.hasCompactionState()) {
3401                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3402                } else {
3403                  future.complete(CompactionState.NONE);
3404                }
3405              }
3406            });
3407        });
3408        break;
3409      case NORMAL:
3410        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3411          if (err != null) {
3412            future.completeExceptionally(err);
3413            return;
3414          }
3415          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3416          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3417          locations.stream().filter(loc -> loc.getServerName() != null)
3418            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3419            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3420              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3421                // If any region compaction state is MAJOR_AND_MINOR
3422                // the table compaction state is MAJOR_AND_MINOR, too.
3423                if (err2 != null) {
3424                  future.completeExceptionally(unwrapCompletionException(err2));
3425                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3426                  future.complete(regionState);
3427                } else {
3428                  regionStates.add(regionState);
3429                }
3430              }));
3431            });
3432          addListener(
3433            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3434            (ret, err3) -> {
3435              // If future not completed, check all regions's compaction state
3436              if (!future.isCompletedExceptionally() && !future.isDone()) {
3437                CompactionState state = CompactionState.NONE;
3438                for (CompactionState regionState : regionStates) {
3439                  switch (regionState) {
3440                    case MAJOR:
3441                      if (state == CompactionState.MINOR) {
3442                        future.complete(CompactionState.MAJOR_AND_MINOR);
3443                      } else {
3444                        state = CompactionState.MAJOR;
3445                      }
3446                      break;
3447                    case MINOR:
3448                      if (state == CompactionState.MAJOR) {
3449                        future.complete(CompactionState.MAJOR_AND_MINOR);
3450                      } else {
3451                        state = CompactionState.MINOR;
3452                      }
3453                      break;
3454                    case NONE:
3455                    default:
3456                  }
3457                }
3458                if (!future.isDone()) {
3459                  future.complete(state);
3460                }
3461              }
3462            });
3463        });
3464        break;
3465      default:
3466        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3467    }
3468
3469    return future;
3470  }
3471
3472  @Override
3473  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3474    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3475    addListener(getRegionLocation(regionName), (location, err) -> {
3476      if (err != null) {
3477        future.completeExceptionally(err);
3478        return;
3479      }
3480      ServerName serverName = location.getServerName();
3481      if (serverName == null) {
3482        future
3483          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3484        return;
3485      }
3486      addListener(
3487        this.<GetRegionInfoResponse> newAdminCaller()
3488          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3489            GetRegionInfoResponse> adminCall(controller, stub,
3490              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3491                true),
3492              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3493          .serverName(serverName).call(),
3494        (resp2, err2) -> {
3495          if (err2 != null) {
3496            future.completeExceptionally(err2);
3497          } else {
3498            if (resp2.hasCompactionState()) {
3499              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3500            } else {
3501              future.complete(CompactionState.NONE);
3502            }
3503          }
3504        });
3505    });
3506    return future;
3507  }
3508
3509  @Override
3510  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3511    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3512      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3513    return this.<Optional<Long>> newMasterCaller()
3514      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3515        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3516          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3517          ProtobufUtil::toOptionalTimestamp))
3518      .call();
3519  }
3520
3521  @Override
3522  public CompletableFuture<Optional<Long>>
3523    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3524    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3525    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3526    addListener(getRegionInfo(regionName), (region, err) -> {
3527      if (err != null) {
3528        future.completeExceptionally(err);
3529        return;
3530      }
3531      MajorCompactionTimestampForRegionRequest.Builder builder =
3532        MajorCompactionTimestampForRegionRequest.newBuilder();
3533      builder.setRegion(
3534        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3535      addListener(this.<Optional<Long>> newMasterCaller()
3536        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3537          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3538            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3539            ProtobufUtil::toOptionalTimestamp))
3540        .call(), (timestamp, err2) -> {
3541          if (err2 != null) {
3542            future.completeExceptionally(err2);
3543          } else {
3544            future.complete(timestamp);
3545          }
3546        });
3547    });
3548    return future;
3549  }
3550
3551  @Override
3552  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3553    List<String> serverNamesList) {
3554    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3555    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3556      if (err != null) {
3557        future.completeExceptionally(err);
3558        return;
3559      }
3560      // Accessed by multiple threads.
3561      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3562      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3563      serverNames.stream().forEach(serverName -> {
3564        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3565          if (err2 != null) {
3566            future.completeExceptionally(unwrapCompletionException(err2));
3567          } else {
3568            serverStates.put(serverName, serverState);
3569          }
3570        }));
3571      });
3572      addListener(
3573        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3574        (ret, err3) -> {
3575          if (!future.isCompletedExceptionally()) {
3576            if (err3 != null) {
3577              future.completeExceptionally(err3);
3578            } else {
3579              future.complete(serverStates);
3580            }
3581          }
3582        });
3583    });
3584    return future;
3585  }
3586
3587  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3588    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3589    if (serverNamesList.isEmpty()) {
3590      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3591        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3592      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3593        if (err != null) {
3594          future.completeExceptionally(err);
3595        } else {
3596          future.complete(clusterMetrics.getServersName());
3597        }
3598      });
3599      return future;
3600    } else {
3601      List<ServerName> serverList = new ArrayList<>();
3602      for (String regionServerName : serverNamesList) {
3603        ServerName serverName = null;
3604        try {
3605          serverName = ServerName.valueOf(regionServerName);
3606        } catch (Exception e) {
3607          future.completeExceptionally(
3608            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3609        }
3610        if (serverName == null) {
3611          future.completeExceptionally(
3612            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3613        } else {
3614          serverList.add(serverName);
3615        }
3616      }
3617      future.complete(serverList);
3618    }
3619    return future;
3620  }
3621
3622  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3623    return this.<Boolean> newAdminCaller().serverName(serverName)
3624      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3625        Boolean> adminCall(controller, stub,
3626          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3627          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3628      .call();
3629  }
3630
3631  @Override
3632  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3633    return this.<Boolean> newMasterCaller()
3634      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3635        Boolean> call(controller, stub,
3636          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3637          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3638          (resp) -> resp.getPrevBalanceValue()))
3639      .call();
3640  }
3641
3642  @Override
3643  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3644    return this.<BalanceResponse> newMasterCaller()
3645      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3646        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3647          (s, c, req, done) -> s.balance(c, req, done),
3648          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3649      .call();
3650  }
3651
3652  @Override
3653  public CompletableFuture<Boolean> isBalancerEnabled() {
3654    return this.<Boolean> newMasterCaller()
3655      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3656        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3657          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3658      .call();
3659  }
3660
3661  @Override
3662  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3663    return this.<Boolean> newMasterCaller()
3664      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3665        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3666          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3667          (resp) -> resp.getPrevNormalizerValue()))
3668      .call();
3669  }
3670
3671  @Override
3672  public CompletableFuture<Boolean> isNormalizerEnabled() {
3673    return this.<Boolean> newMasterCaller()
3674      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3675        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3676          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3677      .call();
3678  }
3679
3680  @Override
3681  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3682    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3683  }
3684
3685  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3686    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3687      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3688  }
3689
3690  @Override
3691  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3692    return this.<Boolean> newMasterCaller()
3693      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3694        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3695          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3696          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3697          (resp) -> resp.getPrevValue()))
3698      .call();
3699  }
3700
3701  @Override
3702  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3703    return this.<Boolean> newMasterCaller()
3704      .action(
3705        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3706          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3707            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3708      .call();
3709  }
3710
3711  @Override
3712  public CompletableFuture<Boolean> runCleanerChore() {
3713    return this.<Boolean> newMasterCaller()
3714      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3715        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3716          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3717          (resp) -> resp.getCleanerChoreRan()))
3718      .call();
3719  }
3720
3721  @Override
3722  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3723    return this.<Boolean> newMasterCaller()
3724      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3725        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3726          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3727      .call();
3728  }
3729
3730  @Override
3731  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3732    return this.<Boolean> newMasterCaller()
3733      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3734        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3735          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3736          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3737      .call();
3738  }
3739
3740  @Override
3741  public CompletableFuture<Integer> runCatalogJanitor() {
3742    return this.<Integer> newMasterCaller()
3743      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3744        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3745          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3746      .call();
3747  }
3748
3749  @Override
3750  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3751    ServiceCaller<S, R> callable) {
3752    MasterCoprocessorRpcChannelImpl channel =
3753      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3754    S stub = stubMaker.apply(channel);
3755    CompletableFuture<R> future = new CompletableFuture<>();
3756    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3757    callable.call(stub, controller, resp -> {
3758      if (controller.failed()) {
3759        future.completeExceptionally(controller.getFailed());
3760      } else {
3761        future.complete(resp);
3762      }
3763    });
3764    return future;
3765  }
3766
3767  @Override
3768  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3769    ServiceCaller<S, R> callable, ServerName serverName) {
3770    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3771      this.<Message> newServerCaller().serverName(serverName));
3772    S stub = stubMaker.apply(channel);
3773    CompletableFuture<R> future = new CompletableFuture<>();
3774    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3775    callable.call(stub, controller, resp -> {
3776      if (controller.failed()) {
3777        future.completeExceptionally(controller.getFailed());
3778      } else {
3779        future.complete(resp);
3780      }
3781    });
3782    return future;
3783  }
3784
3785  @Override
3786  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3787    return this.<List<ServerName>> newMasterCaller()
3788      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3789        List<ServerName>> call(controller, stub,
3790          RequestConverter.buildClearDeadServersRequest(servers),
3791          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3792          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3793      .call();
3794  }
3795
3796  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3797    return this.connection.callerFactory.<T> serverRequest()
3798      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3799      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3800      .pause(pauseNs, TimeUnit.NANOSECONDS)
3801      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3802      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3803  }
3804
3805  @Override
3806  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3807    if (tableName == null) {
3808      return failedFuture(new IllegalArgumentException("Table name is null"));
3809    }
3810    CompletableFuture<Void> future = new CompletableFuture<>();
3811    addListener(tableExists(tableName), (exist, err) -> {
3812      if (err != null) {
3813        future.completeExceptionally(err);
3814        return;
3815      }
3816      if (!exist) {
3817        future.completeExceptionally(new TableNotFoundException(
3818          "Table '" + tableName.getNameAsString() + "' does not exists."));
3819        return;
3820      }
3821      addListener(getTableSplits(tableName), (splits, err1) -> {
3822        if (err1 != null) {
3823          future.completeExceptionally(err1);
3824        } else {
3825          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3826            if (err2 != null) {
3827              future.completeExceptionally(err2);
3828            } else {
3829              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3830                if (err3 != null) {
3831                  future.completeExceptionally(err3);
3832                } else {
3833                  future.complete(result3);
3834                }
3835              });
3836            }
3837          });
3838        }
3839      });
3840    });
3841    return future;
3842  }
3843
3844  @Override
3845  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3846    if (tableName == null) {
3847      return failedFuture(new IllegalArgumentException("Table name is null"));
3848    }
3849    CompletableFuture<Void> future = new CompletableFuture<>();
3850    addListener(tableExists(tableName), (exist, err) -> {
3851      if (err != null) {
3852        future.completeExceptionally(err);
3853        return;
3854      }
3855      if (!exist) {
3856        future.completeExceptionally(new TableNotFoundException(
3857          "Table '" + tableName.getNameAsString() + "' does not exists."));
3858        return;
3859      }
3860      addListener(setTableReplication(tableName, false), (result, err2) -> {
3861        if (err2 != null) {
3862          future.completeExceptionally(err2);
3863        } else {
3864          future.complete(result);
3865        }
3866      });
3867    });
3868    return future;
3869  }
3870
3871  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3872    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3873    addListener(
3874      getRegions(tableName).thenApply(regions -> regions.stream()
3875        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3876      (regions, err2) -> {
3877        if (err2 != null) {
3878          future.completeExceptionally(err2);
3879          return;
3880        }
3881        if (regions.size() == 1) {
3882          future.complete(null);
3883        } else {
3884          byte[][] splits = new byte[regions.size() - 1][];
3885          for (int i = 1; i < regions.size(); i++) {
3886            splits[i - 1] = regions.get(i).getStartKey();
3887          }
3888          future.complete(splits);
3889        }
3890      });
3891    return future;
3892  }
3893
3894  /**
3895   * Connect to peer and check the table descriptor on peer:
3896   * <ol>
3897   * <li>Create the same table on peer when not exist.</li>
3898   * <li>Throw an exception if the table already has replication enabled on any of the column
3899   * families.</li>
3900   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3901   * </ol>
3902   * @param tableName name of the table to sync to the peer
3903   * @param splits    table split keys
3904   */
3905  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3906    byte[][] splits) {
3907    CompletableFuture<Void> future = new CompletableFuture<>();
3908    addListener(listReplicationPeers(), (peers, err) -> {
3909      if (err != null) {
3910        future.completeExceptionally(err);
3911        return;
3912      }
3913      if (peers == null || peers.size() <= 0) {
3914        future.completeExceptionally(
3915          new IllegalArgumentException("Found no peer cluster for replication."));
3916        return;
3917      }
3918      List<CompletableFuture<Void>> futures = new ArrayList<>();
3919      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3920        .forEach(peer -> {
3921          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3922        });
3923      addListener(
3924        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3925        (result, err2) -> {
3926          if (err2 != null) {
3927            future.completeExceptionally(err2);
3928          } else {
3929            future.complete(result);
3930          }
3931        });
3932    });
3933    return future;
3934  }
3935
3936  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3937    ReplicationPeerDescription peer) {
3938    Configuration peerConf;
3939    try {
3940      peerConf = ReplicationPeerConfigUtil
3941        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3942    } catch (IOException e) {
3943      return failedFuture(e);
3944    }
3945    URI connectionUri =
3946      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3947    CompletableFuture<Void> future = new CompletableFuture<>();
3948    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3949      if (err != null) {
3950        future.completeExceptionally(err);
3951        return;
3952      }
3953      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3954        if (err1 != null) {
3955          future.completeExceptionally(err1);
3956          return;
3957        }
3958        AsyncAdmin peerAdmin = conn.getAdmin();
3959        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3960          if (err2 != null) {
3961            future.completeExceptionally(err2);
3962            return;
3963          }
3964          if (!exist) {
3965            CompletableFuture<Void> createTableFuture = null;
3966            if (splits == null) {
3967              createTableFuture = peerAdmin.createTable(tableDesc);
3968            } else {
3969              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3970            }
3971            addListener(createTableFuture, (result, err3) -> {
3972              if (err3 != null) {
3973                future.completeExceptionally(err3);
3974              } else {
3975                future.complete(result);
3976              }
3977            });
3978          } else {
3979            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3980              (result, err4) -> {
3981                if (err4 != null) {
3982                  future.completeExceptionally(err4);
3983                } else {
3984                  future.complete(result);
3985                }
3986              });
3987          }
3988        });
3989      });
3990    });
3991    return future;
3992  }
3993
3994  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3995    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3996    CompletableFuture<Void> future = new CompletableFuture<>();
3997    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3998      if (err != null) {
3999        future.completeExceptionally(err);
4000        return;
4001      }
4002      if (peerTableDesc == null) {
4003        future.completeExceptionally(
4004          new IllegalArgumentException("Failed to get table descriptor for table "
4005            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
4006        return;
4007      }
4008      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
4009        future.completeExceptionally(new IllegalArgumentException(
4010          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
4011            + ", but the table descriptors are not same when compared with source cluster."
4012            + " Thus can not enable the table's replication switch."));
4013        return;
4014      }
4015      future.complete(null);
4016    });
4017    return future;
4018  }
4019
4020  /**
4021   * Set the table's replication switch if the table's replication switch is already not set.
4022   * @param tableName name of the table
4023   * @param enableRep is replication switch enable or disable
4024   */
4025  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
4026    CompletableFuture<Void> future = new CompletableFuture<>();
4027    addListener(getDescriptor(tableName), (tableDesc, err) -> {
4028      if (err != null) {
4029        future.completeExceptionally(err);
4030        return;
4031      }
4032      if (!tableDesc.matchReplicationScope(enableRep)) {
4033        int scope =
4034          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
4035        TableDescriptor newTableDesc =
4036          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
4037        addListener(modifyTable(newTableDesc), (result, err2) -> {
4038          if (err2 != null) {
4039            future.completeExceptionally(err2);
4040          } else {
4041            future.complete(result);
4042          }
4043        });
4044      } else {
4045        future.complete(null);
4046      }
4047    });
4048    return future;
4049  }
4050
4051  @Override
4052  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
4053    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
4054    request.setPeerId(peerId);
4055    return this.<Boolean> newMasterCaller()
4056      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
4057        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
4058          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
4059          resp -> resp.getIsEnabled()))
4060      .call();
4061  }
4062
4063  private void waitUntilAllReplicationPeerModificationProceduresDone(
4064    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
4065    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
4066      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
4067        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
4068          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
4069            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
4070            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
4071            resp -> resp.getProcedureList()))
4072        .call();
4073    addListener(callFuture, (r, e) -> {
4074      if (e != null) {
4075        future.completeExceptionally(e);
4076      } else if (r.isEmpty()) {
4077        // we are done
4078        future.complete(prevOn);
4079      } else {
4080        // retry later to see if the procedures are done
4081        retryTimer.newTimeout(
4082          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
4083          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
4084      }
4085    });
4086  }
4087
4088  @Override
4089  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
4090    boolean drainProcedures) {
4091    ReplicationPeerModificationSwitchRequest request =
4092      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
4093    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
4094      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
4095        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
4096          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
4097          resp -> resp.getPreviousValue()))
4098      .call();
4099    // if we do not need to wait all previous peer modification procedure done, or we are enabling
4100    // peer modification, just return here.
4101    if (!drainProcedures || on) {
4102      return callFuture;
4103    }
4104    // otherwise we need to wait until all previous peer modification procedure done
4105    CompletableFuture<Boolean> future = new CompletableFuture<>();
4106    addListener(callFuture, (prevOn, err) -> {
4107      if (err != null) {
4108        future.completeExceptionally(err);
4109        return;
4110      }
4111      // even if the previous state is disabled, we still need to wait here, as there could be
4112      // another client thread which called this method just before us and have already changed the
4113      // state to off, but there are still peer modification procedures not finished, so we should
4114      // also wait here.
4115      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
4116    });
4117    return future;
4118  }
4119
4120  @Override
4121  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
4122    return this.<Boolean> newMasterCaller()
4123      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
4124        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
4125          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
4126          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
4127          (resp) -> resp.getEnabled()))
4128      .call();
4129  }
4130
4131  @Override
4132  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
4133    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
4134    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
4135      if (err != null) {
4136        future.completeExceptionally(err);
4137        return;
4138      }
4139      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4140        locations.stream().filter(l -> l.getRegion() != null)
4141          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4142          .collect(Collectors.groupingBy(l -> l.getServerName(),
4143            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4144      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4145      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4146      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4147        futures
4148          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4149            if (err2 != null) {
4150              future.completeExceptionally(unwrapCompletionException(err2));
4151            } else {
4152              aggregator.append(stats);
4153            }
4154          }));
4155      }
4156      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4157        (ret, err3) -> {
4158          if (err3 != null) {
4159            future.completeExceptionally(unwrapCompletionException(err3));
4160          } else {
4161            future.complete(aggregator.sum());
4162          }
4163        });
4164    });
4165    return future;
4166  }
4167
4168  @Override
4169  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4170    boolean preserveSplits) {
4171    CompletableFuture<Void> future = new CompletableFuture<>();
4172    addListener(tableExists(tableName), (exist, err) -> {
4173      if (err != null) {
4174        future.completeExceptionally(err);
4175        return;
4176      }
4177      if (!exist) {
4178        future.completeExceptionally(new TableNotFoundException(tableName));
4179        return;
4180      }
4181      addListener(tableExists(newTableName), (exist1, err1) -> {
4182        if (err1 != null) {
4183          future.completeExceptionally(err1);
4184          return;
4185        }
4186        if (exist1) {
4187          future.completeExceptionally(new TableExistsException(newTableName));
4188          return;
4189        }
4190        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4191          if (err2 != null) {
4192            future.completeExceptionally(err2);
4193            return;
4194          }
4195          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4196          if (preserveSplits) {
4197            addListener(getTableSplits(tableName), (splits, err3) -> {
4198              if (err3 != null) {
4199                future.completeExceptionally(err3);
4200              } else {
4201                addListener(
4202                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4203                  (result, err4) -> {
4204                    if (err4 != null) {
4205                      future.completeExceptionally(err4);
4206                    } else {
4207                      future.complete(result);
4208                    }
4209                  });
4210              }
4211            });
4212          } else {
4213            addListener(createTable(newTableDesc), (result, err5) -> {
4214              if (err5 != null) {
4215                future.completeExceptionally(err5);
4216              } else {
4217                future.complete(result);
4218              }
4219            });
4220          }
4221        });
4222      });
4223    });
4224    return future;
4225  }
4226
4227  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4228    List<RegionInfo> hris) {
4229    return this.<CacheEvictionStats> newAdminCaller()
4230      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4231        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4232          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4233          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4234          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4235      .serverName(serverName).call();
4236  }
4237
4238  @Override
4239  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4240    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4241      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4242        Boolean> call(controller, stub,
4243          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4244          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4245          resp -> resp.getPreviousRpcThrottleEnabled()))
4246      .call();
4247    return future;
4248  }
4249
4250  @Override
4251  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4252    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4253      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4254        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4255          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4256          resp -> resp.getRpcThrottleEnabled()))
4257      .call();
4258    return future;
4259  }
4260
4261  @Override
4262  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4263    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4264      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4265        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4266          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4267            .build(),
4268          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4269          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4270      .call();
4271    return future;
4272  }
4273
4274  @Override
4275  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4276    return this.<Map<TableName, Long>> newMasterCaller()
4277      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4278        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4279          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4280          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4281          resp -> resp.getSizesList().stream().collect(Collectors
4282            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4283      .call();
4284  }
4285
4286  @Override
4287  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4288    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4289    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4290      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4291        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4292          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4293          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4294          resp -> resp.getSnapshotsList().stream()
4295            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4296              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4297      .serverName(serverName).call();
4298  }
4299
4300  private CompletableFuture<SpaceQuotaSnapshot>
4301    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4302    return this.<SpaceQuotaSnapshot> newMasterCaller()
4303      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4304        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4305          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4306      .call();
4307  }
4308
4309  @Override
4310  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4311    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4312      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4313      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4314  }
4315
4316  @Override
4317  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4318    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4319    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4320      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4321      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4322  }
4323
4324  @Override
4325  public CompletableFuture<Void> grant(UserPermission userPermission,
4326    boolean mergeExistingPermissions) {
4327    return this.<Void> newMasterCaller()
4328      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4329        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4330        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4331      .call();
4332  }
4333
4334  @Override
4335  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4336    return this.<Void> newMasterCaller()
4337      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4338        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4339        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4340      .call();
4341  }
4342
4343  @Override
4344  public CompletableFuture<List<UserPermission>>
4345    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4346    return this.<List<UserPermission>> newMasterCaller()
4347      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4348        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4349          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4350          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4351          resp -> resp.getUserPermissionList().stream()
4352            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4353            .collect(Collectors.toList())))
4354      .call();
4355  }
4356
4357  @Override
4358  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4359    List<Permission> permissions) {
4360    return this.<List<Boolean>> newMasterCaller()
4361      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4362        List<Boolean>> call(controller, stub,
4363          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4364          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4365          resp -> resp.getHasUserPermissionList()))
4366      .call();
4367  }
4368
4369  @Override
4370  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4371    return this.<Boolean> newMasterCaller()
4372      .action((controller, stub) -> this.call(controller, stub,
4373        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4374        MasterService.Interface::switchSnapshotCleanup,
4375        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4376      .call();
4377  }
4378
4379  @Override
4380  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4381    return this.<Boolean> newMasterCaller()
4382      .action((controller, stub) -> this.call(controller, stub,
4383        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4384        MasterService.Interface::isSnapshotCleanupEnabled,
4385        IsSnapshotCleanupEnabledResponse::getEnabled))
4386      .call();
4387  }
4388
4389  @Override
4390  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4391    return this.<Void> newMasterCaller()
4392      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4393        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4394        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4395      .call();
4396  }
4397
4398  @Override
4399  public CompletableFuture<Void> addRSGroup(String groupName) {
4400    return this.<Void> newMasterCaller()
4401      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4402        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4403        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4404      .call();
4405  }
4406
4407  @Override
4408  public CompletableFuture<Void> removeRSGroup(String groupName) {
4409    return this.<Void> newMasterCaller()
4410      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4411        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4412        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4413      .call();
4414  }
4415
4416  @Override
4417  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4418    BalanceRequest request) {
4419    return this.<BalanceResponse> newMasterCaller()
4420      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4421        BalanceResponse> call(controller, stub,
4422          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4423          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4424      .call();
4425  }
4426
4427  @Override
4428  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4429    return this.<List<RSGroupInfo>> newMasterCaller()
4430      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4431        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4432          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4433            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4434      .call();
4435  }
4436
4437  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4438    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4439    final String logType) {
4440    if (CollectionUtils.isEmpty(serverNames)) {
4441      return CompletableFuture.completedFuture(Collections.emptyList());
4442    }
4443    return CompletableFuture.supplyAsync(() -> serverNames
4444      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4445        filterParams, limit, logType))
4446      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4447  }
4448
4449  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4450    Map<String, Object> filterParams, int limit, String logType) {
4451    return this.<List<LogEntry>> newAdminCaller()
4452      .action((controller, stub) -> this.adminCall(controller, stub,
4453        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4454        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4455      .serverName(serverName).call();
4456  }
4457
4458  @Override
4459  public CompletableFuture<List<Boolean>>
4460    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4461    if (CollectionUtils.isEmpty(serverNames)) {
4462      return CompletableFuture.completedFuture(Collections.emptyList());
4463    }
4464    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4465      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4466    return convertToFutureOfList(clearSlowLogResponseList);
4467  }
4468
4469  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4470    return this.<Boolean> newAdminCaller()
4471      .action((controller, stub) -> this.adminCall(controller, stub,
4472        RequestConverter.buildClearSlowLogResponseRequest(),
4473        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4474      .serverName(serverName).call();
4475  }
4476
4477  private static <T> CompletableFuture<List<T>>
4478    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4479    CompletableFuture<Void> allDoneFuture =
4480      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4481    return allDoneFuture
4482      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4483  }
4484
4485  @Override
4486  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4487    return this.<List<TableName>> newMasterCaller()
4488      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4489        List<TableName>> call(controller, stub,
4490          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4491          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4492            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4493      .call();
4494  }
4495
4496  @Override
4497  public CompletableFuture<Pair<List<String>, List<TableName>>>
4498    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4499    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4500      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4501        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4502        Pair<List<String>, List<TableName>>> call(controller, stub,
4503          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4504            .build(),
4505          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4506          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4507            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4508      .call();
4509  }
4510
4511  @Override
4512  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4513    return this.<RSGroupInfo> newMasterCaller()
4514      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4515        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4516          GetRSGroupInfoOfServerRequest.newBuilder()
4517            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4518              .setPort(hostPort.getPort()).build())
4519            .build(),
4520          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4521          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4522      .call();
4523  }
4524
4525  @Override
4526  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4527    return this.<Void> newMasterCaller()
4528      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4529        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4530        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4531      .call();
4532  }
4533
4534  @Override
4535  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4536    CompletableFuture<Void> future = new CompletableFuture<>();
4537    for (TableName tableName : tables) {
4538      addListener(tableExists(tableName), (exist, err) -> {
4539        if (err != null) {
4540          future.completeExceptionally(err);
4541          return;
4542        }
4543        if (!exist) {
4544          future.completeExceptionally(new TableNotFoundException(tableName));
4545          return;
4546        }
4547      });
4548    }
4549    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4550      if (err != null) {
4551        future.completeExceptionally(err);
4552        return;
4553      }
4554      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4555        future.complete(null);
4556        return;
4557      }
4558      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4559      for (TableDescriptor td : tableDescriptions) {
4560        newTableDescriptors
4561          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4562      }
4563      addListener(
4564        CompletableFuture.allOf(
4565          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4566        (v, e) -> {
4567          if (e != null) {
4568            future.completeExceptionally(e);
4569          } else {
4570            future.complete(v);
4571          }
4572        });
4573    });
4574    return future;
4575  }
4576
4577  @Override
4578  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4579    return this.<RSGroupInfo> newMasterCaller()
4580      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4581        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4582          GetRSGroupInfoOfTableRequest.newBuilder()
4583            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4584          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4585          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4586      .call();
4587  }
4588
4589  @Override
4590  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4591    return this.<RSGroupInfo> newMasterCaller()
4592      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4593        RSGroupInfo> call(controller, stub,
4594          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4595          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4596          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4597      .call();
4598  }
4599
4600  @Override
4601  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4602    return this.<Void> newMasterCaller()
4603      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4604        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4605          .setNewRsgroupName(newName).build(),
4606        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4607      .call();
4608  }
4609
4610  @Override
4611  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4612    Map<String, String> configuration) {
4613    UpdateRSGroupConfigRequest.Builder request =
4614      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4615    if (configuration != null) {
4616      configuration.entrySet().forEach(e -> request.addConfiguration(
4617        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4618    }
4619    return this.<Void> newMasterCaller()
4620      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4621        Void> call(controller, stub, request.build(),
4622          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4623      .call();
4624  }
4625
4626  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4627    return this.<List<LogEntry>> newMasterCaller()
4628      .action((controller, stub) -> this.call(controller, stub,
4629        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4630        ProtobufUtil::toBalancerDecisionResponse))
4631      .call();
4632  }
4633
4634  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4635    return this.<List<LogEntry>> newMasterCaller()
4636      .action((controller, stub) -> this.call(controller, stub,
4637        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4638        ProtobufUtil::toBalancerRejectionResponse))
4639      .call();
4640  }
4641
4642  @Override
4643  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4644    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4645    if (logType == null || serverType == null) {
4646      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4647    }
4648    switch (logType) {
4649      case "SLOW_LOG":
4650      case "LARGE_LOG":
4651        if (ServerType.MASTER.equals(serverType)) {
4652          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4653        }
4654        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4655      case "BALANCER_DECISION":
4656        if (ServerType.REGION_SERVER.equals(serverType)) {
4657          throw new IllegalArgumentException(
4658            "Balancer Decision logs are not maintained by HRegionServer");
4659        }
4660        return getBalancerDecisions(limit);
4661      case "BALANCER_REJECTION":
4662        if (ServerType.REGION_SERVER.equals(serverType)) {
4663          throw new IllegalArgumentException(
4664            "Balancer Rejection logs are not maintained by HRegionServer");
4665        }
4666        return getBalancerRejections(limit);
4667      default:
4668        return CompletableFuture.completedFuture(Collections.emptyList());
4669    }
4670  }
4671
4672  @Override
4673  public CompletableFuture<Void> flushMasterStore() {
4674    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4675    return this.<Void> newMasterCaller()
4676      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4677        Void> call(controller, stub, request.build(),
4678          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4679      .call();
4680  }
4681
4682  @Override
4683  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4684    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4685    return this.<List<String>> newAdminCaller()
4686      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4687        List<String>> adminCall(controller, stub, request.build(),
4688          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4689          resp -> resp.getCachedFilesList()))
4690      .serverName(serverName).call();
4691  }
4692
4693  @Override
4694  public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) {
4695    MasterProtos.RestoreBackupSystemTableRequest request =
4696      MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName)
4697        .build();
4698    return this.<MasterProtos.RestoreBackupSystemTableRequest,
4699      MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request,
4700        MasterService.Interface::restoreBackupSystemTable,
4701        MasterProtos.RestoreBackupSystemTableResponse::getProcId,
4702        new RestoreBackupSystemTableProcedureBiConsumer());
4703  }
4704
4705  private CompletableFuture<Long> internalRefershHFiles(RefreshHFilesRequest request) {
4706    return this.<Long> newMasterCaller()
4707      .action((controller, stub) -> this.<RefreshHFilesRequest, RefreshHFilesResponse, Long> call(
4708        controller, stub, request, MasterService.Interface::refreshHFiles,
4709        RefreshHFilesResponse::getProcId))
4710      .call();
4711  }
4712
4713  @Override
4714  public CompletableFuture<Long> refreshMeta() {
4715    RefreshMetaRequest.Builder request = RefreshMetaRequest.newBuilder();
4716    request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce());
4717    return this.<Long> newMasterCaller()
4718      .action((controller, stub) -> this.<RefreshMetaRequest, RefreshMetaResponse, Long> call(
4719        controller, stub, request.build(), MasterService.Interface::refreshMeta,
4720        RefreshMetaResponse::getProcId))
4721      .call();
4722  }
4723
4724  @Override
4725  public CompletableFuture<Long> refreshHFiles(final TableName tableName) {
4726    if (tableName.isSystemTable()) {
4727      LOG.warn("Refreshing HFiles for system table {} is not allowed", tableName.getNameAsString());
4728      throw new IllegalArgumentException(
4729        "Not allowed to refresh HFiles for system table '" + tableName.getNameAsString() + "'");
4730    }
4731    // Request builder
4732    RefreshHFilesRequest.Builder request = RefreshHFilesRequest.newBuilder();
4733    request.setTableName(ProtobufUtil.toProtoTableName(tableName));
4734    request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce());
4735    return internalRefershHFiles(request.build());
4736  }
4737
4738  @Override
4739  public CompletableFuture<Long> refreshHFiles(final String namespace) {
4740    if (
4741      namespace.equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)
4742        || namespace.equals(NamespaceDescriptor.BACKUP_NAMESPACE_NAME_STR)
4743    ) {
4744      LOG.warn("Refreshing HFiles for reserve namespace {} is not allowed", namespace);
4745      throw new IllegalArgumentException(
4746        "Not allowed to refresh HFiles for reserve namespace '" + namespace + "'");
4747    }
4748    // Request builder
4749    RefreshHFilesRequest.Builder request = RefreshHFilesRequest.newBuilder();
4750    request.setNamespace(namespace);
4751    request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce());
4752    return internalRefershHFiles(request.build());
4753  }
4754
4755  @Override
4756  public CompletableFuture<Long> refreshHFiles() {
4757    // Request builder
4758    RefreshHFilesRequest.Builder request = RefreshHFilesRequest.newBuilder();
4759    // Set nonce
4760    request.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce());
4761    return internalRefershHFiles(request.build());
4762  }
4763}